| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Asynchronous implementation of Avro file reader. |
| //! |
| //! This module provides [`AsyncAvroFileReader`], which supports reading and decoding |
| //! the Avro OCF format from any source that implements [`AsyncFileReader`]. |
| |
| use crate::compression::CompressionCodec; |
| use crate::reader::Decoder; |
| use crate::reader::block::{BlockDecoder, BlockDecoderState}; |
| use arrow_array::RecordBatch; |
| use arrow_schema::{ArrowError, SchemaRef}; |
| use bytes::Bytes; |
| use futures::future::BoxFuture; |
| use futures::{FutureExt, Stream}; |
| use std::mem; |
| use std::ops::Range; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| mod async_file_reader; |
| mod builder; |
| |
| pub use async_file_reader::AsyncFileReader; |
| pub use builder::{ReaderBuilder, read_header_info}; |
| |
| #[cfg(feature = "object_store")] |
| mod store; |
| |
| use crate::errors::AvroError; |
| #[cfg(feature = "object_store")] |
| pub use store::AvroObjectReader; |
| |
| enum FetchNextBehaviour { |
| /// Initial read: scan for sync marker, then move to decoding blocks |
| ReadSyncMarker, |
| /// Parse VLQ header bytes one at a time until Data state, then continue decoding |
| DecodeVLQHeader, |
| /// Continue decoding the current block with the fetched data |
| ContinueDecoding, |
| } |
| |
| enum ReaderState<R> { |
| /// Intermediate state to fix ownership issues |
| InvalidState, |
| /// Initial state, fetch initial range |
| Idle { reader: R }, |
| /// Fetching data from the reader |
| FetchingData { |
| future: BoxFuture<'static, Result<(R, Bytes), AvroError>>, |
| next_behaviour: FetchNextBehaviour, |
| }, |
| /// Decode a block in a loop until completion |
| DecodingBlock { data: Bytes, reader: R }, |
| /// Output batches from a decoded block |
| ReadingBatches { |
| data: Bytes, |
| block_data: Bytes, |
| remaining_in_block: usize, |
| reader: R, |
| }, |
| /// Successfully finished reading file contents; drain any remaining buffered records |
| /// from the decoder into (possibly partial) output batches. |
| Flushing, |
| /// Done, flush decoder and return |
| Finished, |
| } |
| |
| /// An asynchronous Avro file reader that implements `Stream<Item = Result<RecordBatch, ArrowError>>`. |
| /// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting with fetching the header, |
| /// then reading all the blocks in the provided range where: |
| /// 1. Reads and decodes data until the header is fully decoded. |
| /// 2. Searching from `range.start` for the first sync marker, and starting with the following block. |
| /// (If `range.start` is less than the header length, we start at the header length minus the sync marker bytes) |
| /// 3. Reading blocks sequentially, decoding them into RecordBatches. |
| /// 4. If a block is incomplete (due to range ending mid-block), fetching the remaining bytes from the [`AsyncFileReader`]. |
| /// 5. If no range was originally provided, reads the full file. |
| /// 6. If the range is 0, file_size is 0, or `range.end` is less than the header length, finish immediately. |
| /// |
| /// # Example |
| /// |
| /// ``` |
| /// #[tokio::main(flavor = "current_thread")] |
| /// async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| /// use std::io::Cursor; |
| /// use std::sync::Arc; |
| /// use arrow_array::{ArrayRef, Int32Array, RecordBatch}; |
| /// use arrow_schema::{DataType, Field, Schema}; |
| /// use arrow_avro::reader::AsyncAvroFileReader; |
| /// use arrow_avro::writer::AvroWriter; |
| /// use futures::TryStreamExt; |
| /// |
| /// // Build a minimal Arrow schema and batch |
| /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); |
| /// let batch = RecordBatch::try_new( |
| /// Arc::new(schema.clone()), |
| /// vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], |
| /// )?; |
| /// |
| /// // Write an Avro OCF to memory |
| /// let buffer: Vec<u8> = Vec::new(); |
| /// let mut writer = AvroWriter::new(buffer, schema)?; |
| /// writer.write(&batch)?; |
| /// writer.finish()?; |
| /// let bytes = writer.into_inner(); |
| /// |
| /// // Create an async reader from the in-memory bytes |
| /// // `tokio::fs::File` also implements `AsyncFileReader` for reading from disk |
| /// let file_size = bytes.len(); |
| /// let cursor = Cursor::new(bytes); |
| /// let reader = AsyncAvroFileReader::builder(cursor, file_size as u64, 1024) |
| /// .try_build() |
| /// .await?; |
| /// |
| /// // Consume the stream of RecordBatches |
| /// let batches: Vec<RecordBatch> = reader.try_collect().await?; |
| /// assert_eq!(batches.len(), 1); |
| /// assert_eq!(batches[0].num_rows(), 3); |
| /// Ok(()) |
| /// } |
| /// ``` |
| pub struct AsyncAvroFileReader<R> { |
| // Members required to fetch data |
| range: Range<u64>, |
| file_size: u64, |
| |
| // Members required to actually decode and read data |
| decoder: Decoder, |
| block_decoder: BlockDecoder, |
| codec: Option<CompressionCodec>, |
| sync_marker: [u8; 16], |
| |
| // Members keeping the current state of the reader |
| reader_state: ReaderState<R>, |
| finishing_partial_block: bool, |
| } |
| |
| impl<R> AsyncAvroFileReader<R> { |
| /// Returns a builder for a new [`Self`], allowing some optional parameters. |
| pub fn builder(reader: R, file_size: u64, batch_size: usize) -> ReaderBuilder<R> { |
| ReaderBuilder::new(reader, file_size, batch_size) |
| } |
| |
| fn new( |
| range: Range<u64>, |
| file_size: u64, |
| decoder: Decoder, |
| codec: Option<CompressionCodec>, |
| sync_marker: [u8; 16], |
| reader_state: ReaderState<R>, |
| ) -> Self { |
| Self { |
| range, |
| file_size, |
| |
| decoder, |
| block_decoder: Default::default(), |
| codec, |
| sync_marker, |
| |
| reader_state, |
| finishing_partial_block: false, |
| } |
| } |
| |
| /// Returns the Arrow schema for batches produced by this reader. |
| /// |
| /// The schema is determined by the writer schema in the file and the reader schema provided to the builder. |
| pub fn schema(&self) -> SchemaRef { |
| self.decoder.schema() |
| } |
| |
| /// Calculate the byte range needed to complete the current block. |
| /// Only valid when block_decoder is in Data or Sync state. |
| /// Returns the range to fetch, or an error if EOF would be reached. |
| fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> { |
| let remaining = self.block_decoder.bytes_remaining() as u64 |
| + match self.block_decoder.state() { |
| BlockDecoderState::Data => 16, // Include sync marker |
| BlockDecoderState::Sync => 0, |
| state => { |
| return Err(AvroError::General(format!( |
| "remaining_block_range called in unexpected state: {state:?}" |
| ))); |
| } |
| }; |
| |
| let fetch_end = self.range.end + remaining; |
| if fetch_end > self.file_size { |
| return Err(AvroError::EOF( |
| "Avro block requires more bytes than what exists in the file".into(), |
| )); |
| } |
| |
| Ok(self.range.end..fetch_end) |
| } |
| |
| /// Terminate the stream after returning this error once. |
| #[inline] |
| fn finish_with_error( |
| &mut self, |
| error: AvroError, |
| ) -> Poll<Option<Result<RecordBatch, AvroError>>> { |
| self.reader_state = ReaderState::Finished; |
| Poll::Ready(Some(Err(error))) |
| } |
| |
| #[inline] |
| fn start_flushing(&mut self) { |
| self.reader_state = ReaderState::Flushing; |
| } |
| |
| /// Drain any remaining buffered records from the decoder. |
| #[inline] |
| fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> { |
| match self.decoder.flush() { |
| Ok(Some(batch)) => { |
| self.reader_state = ReaderState::Flushing; |
| Poll::Ready(Some(Ok(batch))) |
| } |
| Ok(None) => { |
| self.reader_state = ReaderState::Finished; |
| Poll::Ready(None) |
| } |
| Err(e) => self.finish_with_error(e), |
| } |
| } |
| } |
| |
| impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> { |
| // The forbid question mark thing shouldn't apply here, as it is within the future, |
| // so exported this to a separate function. |
| async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, Bytes), AvroError> { |
| let data = reader.get_bytes(range).await?; |
| Ok((reader, data)) |
| } |
| |
| #[forbid(clippy::question_mark_used)] |
| fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, AvroError>>> { |
| loop { |
| match mem::replace(&mut self.reader_state, ReaderState::InvalidState) { |
| ReaderState::Idle { reader } => { |
| let range = self.range.clone(); |
| if range.start >= range.end { |
| return self.finish_with_error(AvroError::InvalidArgument(format!( |
| "Invalid range specified for Avro file: start {} >= end {}, file_size: {}", |
| range.start, range.end, self.file_size |
| ))); |
| } |
| |
| let future = Self::fetch_bytes(reader, range).boxed(); |
| self.reader_state = ReaderState::FetchingData { |
| future, |
| next_behaviour: FetchNextBehaviour::ReadSyncMarker, |
| }; |
| } |
| ReaderState::FetchingData { |
| mut future, |
| next_behaviour, |
| } => { |
| let (reader, data_chunk) = match future.poll_unpin(cx) { |
| Poll::Ready(Ok(data)) => data, |
| Poll::Ready(Err(e)) => return self.finish_with_error(e), |
| Poll::Pending => { |
| self.reader_state = ReaderState::FetchingData { |
| future, |
| next_behaviour, |
| }; |
| return Poll::Pending; |
| } |
| }; |
| |
| match next_behaviour { |
| FetchNextBehaviour::ReadSyncMarker => { |
| let sync_marker_pos = data_chunk |
| .windows(16) |
| .position(|slice| slice == self.sync_marker); |
| let block_start = match sync_marker_pos { |
| Some(pos) => pos + 16, // Move past the sync marker |
| None => { |
| // Sync marker not found, valid if we arbitrarily split the file at its end. |
| self.reader_state = ReaderState::Finished; |
| return Poll::Ready(None); |
| } |
| }; |
| |
| self.reader_state = ReaderState::DecodingBlock { |
| reader, |
| data: data_chunk.slice(block_start..), |
| }; |
| } |
| FetchNextBehaviour::DecodeVLQHeader => { |
| let mut data = data_chunk; |
| |
| // Feed bytes one at a time until we reach Data state (VLQ header complete) |
| while !matches!(self.block_decoder.state(), BlockDecoderState::Data) { |
| if data.is_empty() { |
| return self.finish_with_error(AvroError::EOF( |
| "Unexpected EOF while reading Avro block header".into(), |
| )); |
| } |
| let consumed = match self.block_decoder.decode(&data[..1]) { |
| Ok(consumed) => consumed, |
| Err(e) => return self.finish_with_error(e), |
| }; |
| if consumed == 0 { |
| return self.finish_with_error(AvroError::General( |
| "BlockDecoder failed to consume byte during VLQ header parsing" |
| .into(), |
| )); |
| } |
| data = data.slice(consumed..); |
| } |
| |
| // Now we know the block size. Slice remaining data to what we need. |
| let bytes_remaining = self.block_decoder.bytes_remaining(); |
| let data_to_use = data.slice(..data.len().min(bytes_remaining)); |
| let consumed = match self.block_decoder.decode(&data_to_use) { |
| Ok(consumed) => consumed, |
| Err(e) => return self.finish_with_error(e), |
| }; |
| if consumed != data_to_use.len() { |
| return self.finish_with_error(AvroError::General( |
| "BlockDecoder failed to consume all bytes after VLQ header parsing" |
| .into(), |
| )); |
| } |
| |
| // May need more data to finish the block. |
| let range_to_fetch = match self.remaining_block_range() { |
| Ok(range) if range.is_empty() => { |
| // All bytes fetched, move to decoding block directly |
| self.reader_state = ReaderState::DecodingBlock { |
| reader, |
| data: Bytes::new(), |
| }; |
| continue; |
| } |
| Ok(range) => range, |
| Err(e) => return self.finish_with_error(e), |
| }; |
| |
| let future = Self::fetch_bytes(reader, range_to_fetch).boxed(); |
| self.reader_state = ReaderState::FetchingData { |
| future, |
| next_behaviour: FetchNextBehaviour::ContinueDecoding, |
| }; |
| continue; |
| } |
| FetchNextBehaviour::ContinueDecoding => { |
| self.reader_state = ReaderState::DecodingBlock { |
| reader, |
| data: data_chunk, |
| }; |
| } |
| } |
| } |
| ReaderState::InvalidState => { |
| return self.finish_with_error(AvroError::General( |
| "AsyncAvroFileReader in invalid state".into(), |
| )); |
| } |
| ReaderState::DecodingBlock { reader, mut data } => { |
| // Try to decode another block from the buffered reader. |
| let consumed = match self.block_decoder.decode(&data) { |
| Ok(consumed) => consumed, |
| Err(e) => return self.finish_with_error(e), |
| }; |
| data = data.slice(consumed..); |
| |
| // If we reached the end of the block, flush it, and move to read batches. |
| if let Some(block) = self.block_decoder.flush() { |
| // Successfully decoded a block. |
| let block_count = block.count; |
| let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec { |
| match codec.decompress(&block.data) { |
| Ok(decompressed) => decompressed, |
| Err(e) => return self.finish_with_error(e), |
| } |
| } else { |
| block.data |
| }); |
| |
| // Since we have an active block, move to reading batches |
| self.reader_state = ReaderState::ReadingBatches { |
| reader, |
| data, |
| block_data, |
| remaining_in_block: block_count, |
| }; |
| continue; |
| } |
| |
| // data should always be consumed unless Finished, if it wasn't, something went wrong |
| if !data.is_empty() { |
| return self.finish_with_error(AvroError::General( |
| "BlockDecoder failed to make progress decoding Avro block".into(), |
| )); |
| } |
| |
| if matches!(self.block_decoder.state(), BlockDecoderState::Finished) { |
| // We've already flushed, so if no batch was produced, we are simply done. |
| self.finishing_partial_block = false; |
| self.start_flushing(); |
| continue; |
| } |
| |
| // If we've tried the following stage before, and still can't decode, |
| // this means the file is truncated or corrupted. |
| if self.finishing_partial_block { |
| return self.finish_with_error(AvroError::EOF( |
| "Unexpected EOF while reading last Avro block".into(), |
| )); |
| } |
| |
| // Avro splitting case: block is incomplete, we need to: |
| // 1. Parse the length so we know how much to read |
| // 2. Fetch more data from the reader |
| // 3. Create a new block data from the remaining slice and the newly fetched data |
| // 4. Continue decoding until end of block |
| self.finishing_partial_block = true; |
| |
| // Mid-block, but we don't know how many bytes are missing yet |
| if matches!( |
| self.block_decoder.state(), |
| BlockDecoderState::Count | BlockDecoderState::Size |
| ) { |
| // Max VLQ header is 20 bytes (10 bytes each for count and size). |
| // Fetch just enough to complete it. |
| const MAX_VLQ_HEADER_SIZE: u64 = 20; |
| let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size); |
| |
| // If there is nothing more to fetch, error out |
| if fetch_end == self.range.end { |
| return self.finish_with_error(AvroError::EOF( |
| "Unexpected EOF while reading Avro block header".into(), |
| )); |
| } |
| |
| let range_to_fetch = self.range.end..fetch_end; |
| self.range.end = fetch_end; // Track that we've fetched these bytes |
| |
| let future = Self::fetch_bytes(reader, range_to_fetch).boxed(); |
| self.reader_state = ReaderState::FetchingData { |
| future, |
| next_behaviour: FetchNextBehaviour::DecodeVLQHeader, |
| }; |
| continue; |
| } |
| |
| // Otherwise, we're mid-block but know how many bytes are remaining to fetch. |
| let range_to_fetch = match self.remaining_block_range() { |
| Ok(range) => range, |
| Err(e) => return self.finish_with_error(e), |
| }; |
| |
| let future = Self::fetch_bytes(reader, range_to_fetch).boxed(); |
| self.reader_state = ReaderState::FetchingData { |
| future, |
| next_behaviour: FetchNextBehaviour::ContinueDecoding, |
| }; |
| continue; |
| } |
| ReaderState::ReadingBatches { |
| reader, |
| data, |
| mut block_data, |
| mut remaining_in_block, |
| } => { |
| let (consumed, records_decoded) = |
| match self.decoder.decode_block(&block_data, remaining_in_block) { |
| Ok((consumed, records_decoded)) => (consumed, records_decoded), |
| Err(e) => return self.finish_with_error(e), |
| }; |
| |
| remaining_in_block -= records_decoded; |
| |
| if remaining_in_block == 0 { |
| if data.is_empty() { |
| // No more data to read, drain remaining buffered records |
| self.start_flushing(); |
| } else { |
| // Finished this block, move to decode next block in the next iteration |
| self.reader_state = ReaderState::DecodingBlock { reader, data }; |
| } |
| } else { |
| // Still more records to decode in this block, slice the already-read data and stay in this state |
| block_data = block_data.slice(consumed..); |
| self.reader_state = ReaderState::ReadingBatches { |
| reader, |
| data, |
| block_data, |
| remaining_in_block, |
| }; |
| } |
| |
| // We have a full batch ready, emit it |
| // (This is not mutually exclusive with the block being finished, so the state change is valid) |
| if self.decoder.batch_is_full() { |
| return match self.decoder.flush() { |
| Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))), |
| Ok(None) => self.finish_with_error(AvroError::General( |
| "Decoder reported a full batch, but flush returned None".into(), |
| )), |
| Err(e) => self.finish_with_error(e), |
| }; |
| } |
| } |
| ReaderState::Flushing => { |
| return self.poll_flush(); |
| } |
| ReaderState::Finished => { |
| // Terminal: once finished (including after an error), always yield None |
| self.reader_state = ReaderState::Finished; |
| return Poll::Ready(None); |
| } |
| } |
| } |
| } |
| } |
| |
| // To maintain compatibility with the expected stream results in the ecosystem, this returns ArrowError. |
| impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> { |
| type Item = Result<RecordBatch, ArrowError>; |
| |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| self.read_next(cx).map_err(Into::into) |
| } |
| } |
| |
| #[cfg(all(test, feature = "object_store"))] |
| mod tests { |
| use super::*; |
| use crate::codec::Tz; |
| use crate::schema::{ |
| AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, AvroSchema, SCHEMA_METADATA_KEY, |
| }; |
| use arrow_array::cast::AsArray; |
| use arrow_array::types::{Int32Type, Int64Type}; |
| use arrow_array::*; |
| use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; |
| use futures::{StreamExt, TryStreamExt}; |
| use object_store::local::LocalFileSystem; |
| use object_store::path::Path; |
| use object_store::{ObjectStore, ObjectStoreExt}; |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| |
| fn arrow_test_data(file: &str) -> String { |
| let base = |
| std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| "../testing/data".to_string()); |
| format!("{}/{}", base, file) |
| } |
| |
| fn get_alltypes_schema() -> SchemaRef { |
| get_alltypes_schema_with_tz("+00:00") |
| } |
| |
| fn get_alltypes_schema_with_tz(tz_id: &str) -> SchemaRef { |
| let schema = Schema::new(vec![ |
| Field::new("id", DataType::Int32, true), |
| Field::new("bool_col", DataType::Boolean, true), |
| Field::new("tinyint_col", DataType::Int32, true), |
| Field::new("smallint_col", DataType::Int32, true), |
| Field::new("int_col", DataType::Int32, true), |
| Field::new("bigint_col", DataType::Int64, true), |
| Field::new("float_col", DataType::Float32, true), |
| Field::new("double_col", DataType::Float64, true), |
| Field::new("date_string_col", DataType::Binary, true), |
| Field::new("string_col", DataType::Binary, true), |
| Field::new( |
| "timestamp_col", |
| DataType::Timestamp(TimeUnit::Microsecond, Some(tz_id.into())), |
| true, |
| ), |
| ]) |
| .with_metadata(HashMap::from([( |
| SCHEMA_METADATA_KEY.into(), |
| r#"{ |
| "type": "record", |
| "name": "topLevelRecord", |
| "fields": [ |
| { |
| "name": "id", |
| "type": [ |
| "int", |
| "null" |
| ] |
| }, |
| { |
| "name": "bool_col", |
| "type": [ |
| "boolean", |
| "null" |
| ] |
| }, |
| { |
| "name": "tinyint_col", |
| "type": [ |
| "int", |
| "null" |
| ] |
| }, |
| { |
| "name": "smallint_col", |
| "type": [ |
| "int", |
| "null" |
| ] |
| }, |
| { |
| "name": "int_col", |
| "type": [ |
| "int", |
| "null" |
| ] |
| }, |
| { |
| "name": "bigint_col", |
| "type": [ |
| "long", |
| "null" |
| ] |
| }, |
| { |
| "name": "float_col", |
| "type": [ |
| "float", |
| "null" |
| ] |
| }, |
| { |
| "name": "double_col", |
| "type": [ |
| "double", |
| "null" |
| ] |
| }, |
| { |
| "name": "date_string_col", |
| "type": [ |
| "bytes", |
| "null" |
| ] |
| }, |
| { |
| "name": "string_col", |
| "type": [ |
| "bytes", |
| "null" |
| ] |
| }, |
| { |
| "name": "timestamp_col", |
| "type": [ |
| { |
| "type": "long", |
| "logicalType": "timestamp-micros" |
| }, |
| "null" |
| ] |
| } |
| ] |
| } |
| "# |
| .into(), |
| )])); |
| Arc::new(schema) |
| } |
| |
| fn get_alltypes_with_nulls_schema() -> SchemaRef { |
| let schema = Schema::new(vec![ |
| Field::new("string_col", DataType::Binary, true), |
| Field::new("int_col", DataType::Int32, true), |
| Field::new("bool_col", DataType::Boolean, true), |
| Field::new("bigint_col", DataType::Int64, true), |
| Field::new("float_col", DataType::Float32, true), |
| Field::new("double_col", DataType::Float64, true), |
| Field::new("bytes_col", DataType::Binary, true), |
| ]) |
| .with_metadata(HashMap::from([( |
| SCHEMA_METADATA_KEY.into(), |
| r#"{ |
| "type": "record", |
| "name": "topLevelRecord", |
| "fields": [ |
| { |
| "name": "string_col", |
| "type": [ |
| "null", |
| "string" |
| ], |
| "default": null |
| }, |
| { |
| "name": "int_col", |
| "type": [ |
| "null", |
| "int" |
| ], |
| "default": null |
| }, |
| { |
| "name": "bool_col", |
| "type": [ |
| "null", |
| "boolean" |
| ], |
| "default": null |
| }, |
| { |
| "name": "bigint_col", |
| "type": [ |
| "null", |
| "long" |
| ], |
| "default": null |
| }, |
| { |
| "name": "float_col", |
| "type": [ |
| "null", |
| "float" |
| ], |
| "default": null |
| }, |
| { |
| "name": "double_col", |
| "type": [ |
| "null", |
| "double" |
| ], |
| "default": null |
| }, |
| { |
| "name": "bytes_col", |
| "type": [ |
| "null", |
| "bytes" |
| ], |
| "default": null |
| } |
| ] |
| }"# |
| .into(), |
| )])); |
| |
| Arc::new(schema) |
| } |
| |
| fn get_nested_records_schema() -> SchemaRef { |
| let schema = Schema::new(vec![ |
| Field::new( |
| "f1", |
| DataType::Struct( |
| vec![ |
| Field::new("f1_1", DataType::Utf8, false), |
| Field::new("f1_2", DataType::Int32, false), |
| Field::new( |
| "f1_3", |
| DataType::Struct( |
| vec![Field::new("f1_3_1", DataType::Float64, false)].into(), |
| ), |
| false, |
| ) |
| .with_metadata(HashMap::from([ |
| (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns3".to_owned()), |
| (AVRO_NAME_METADATA_KEY.to_owned(), "record3".to_owned()), |
| ])), |
| ] |
| .into(), |
| ), |
| false, |
| ) |
| .with_metadata(HashMap::from([ |
| (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns2".to_owned()), |
| (AVRO_NAME_METADATA_KEY.to_owned(), "record2".to_owned()), |
| ])), |
| Field::new( |
| "f2", |
| DataType::List(Arc::new( |
| Field::new( |
| "item", |
| DataType::Struct( |
| vec![ |
| Field::new("f2_1", DataType::Boolean, false), |
| Field::new("f2_2", DataType::Float32, false), |
| ] |
| .into(), |
| ), |
| false, |
| ) |
| .with_metadata(HashMap::from([ |
| (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns4".to_owned()), |
| (AVRO_NAME_METADATA_KEY.to_owned(), "record4".to_owned()), |
| ])), |
| )), |
| false, |
| ), |
| Field::new( |
| "f3", |
| DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()), |
| true, |
| ) |
| .with_metadata(HashMap::from([ |
| (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns5".to_owned()), |
| (AVRO_NAME_METADATA_KEY.to_owned(), "record5".to_owned()), |
| ])), |
| Field::new( |
| "f4", |
| DataType::List(Arc::new( |
| Field::new( |
| "item", |
| DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()), |
| true, |
| ) |
| .with_metadata(HashMap::from([ |
| (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns6".to_owned()), |
| (AVRO_NAME_METADATA_KEY.to_owned(), "record6".to_owned()), |
| ])), |
| )), |
| false, |
| ), |
| ]) |
| .with_metadata(HashMap::from([( |
| SCHEMA_METADATA_KEY.into(), |
| r#"{ |
| "type": "record", |
| "namespace": "ns1", |
| "name": "record1", |
| "fields": [ |
| { |
| "name": "f1", |
| "type": { |
| "type": "record", |
| "namespace": "ns2", |
| "name": "record2", |
| "fields": [ |
| { |
| "name": "f1_1", |
| "type": "string" |
| }, |
| { |
| "name": "f1_2", |
| "type": "int" |
| }, |
| { |
| "name": "f1_3", |
| "type": { |
| "type": "record", |
| "namespace": "ns3", |
| "name": "record3", |
| "fields": [ |
| { |
| "name": "f1_3_1", |
| "type": "double" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| }, |
| { |
| "name": "f2", |
| "type": { |
| "type": "array", |
| "items": { |
| "type": "record", |
| "namespace": "ns4", |
| "name": "record4", |
| "fields": [ |
| { |
| "name": "f2_1", |
| "type": "boolean" |
| }, |
| { |
| "name": "f2_2", |
| "type": "float" |
| } |
| ] |
| } |
| } |
| }, |
| { |
| "name": "f3", |
| "type": [ |
| "null", |
| { |
| "type": "record", |
| "namespace": "ns5", |
| "name": "record5", |
| "fields": [ |
| { |
| "name": "f3_1", |
| "type": "string" |
| } |
| ] |
| } |
| ], |
| "default": null |
| }, |
| { |
| "name": "f4", |
| "type": { |
| "type": "array", |
| "items": [ |
| "null", |
| { |
| "type": "record", |
| "namespace": "ns6", |
| "name": "record6", |
| "fields": [ |
| { |
| "name": "f4_1", |
| "type": "long" |
| } |
| ] |
| } |
| ] |
| } |
| } |
| ] |
| } |
| "# |
| .into(), |
| )])); |
| |
| Arc::new(schema) |
| } |
| |
| async fn read_async_file( |
| path: &str, |
| batch_size: usize, |
| range: Option<Range<u64>>, |
| schema: Option<SchemaRef>, |
| projection: Option<Vec<usize>>, |
| ) -> Result<Vec<RecordBatch>, ArrowError> { |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(path).unwrap(); |
| |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, batch_size); |
| |
| if let Some(s) = schema { |
| let reader_schema = AvroSchema::try_from(s.as_ref())?; |
| builder = builder.with_reader_schema(reader_schema); |
| } |
| |
| if let Some(proj) = projection { |
| builder = builder.with_projection(proj); |
| } |
| |
| if let Some(range) = range { |
| builder = builder.with_range(range); |
| } |
| |
| let reader = builder.try_build().await?; |
| reader.try_collect().await |
| } |
| |
| #[tokio::test] |
| async fn test_full_file_read() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 11); |
| |
| let id_array = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap(); |
| assert_eq!(id_array.value(0), 4); |
| assert_eq!(id_array.value(7), 1); |
| } |
| |
| #[tokio::test] |
| async fn test_small_batch_size() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 2, None, Some(schema), None) |
| .await |
| .unwrap(); |
| assert_eq!(batches.len(), 4); |
| |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert_eq!(batch.num_columns(), 11); |
| } |
| |
| #[tokio::test] |
| async fn test_batch_size_one() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batches.len(), 8); |
| assert_eq!(batch.num_rows(), 1); |
| } |
| |
| #[tokio::test] |
| async fn test_batch_size_larger_than_file() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 10000, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| } |
| |
| #[tokio::test] |
| async fn test_empty_range() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let range = 100..100; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| assert_eq!(batches.len(), 0); |
| } |
| |
| #[tokio::test] |
| async fn test_range_starting_at_zero() { |
| // Tests that range starting at 0 correctly skips header |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let meta = store.head(&location).await.unwrap(); |
| |
| let range = 0..meta.size; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| } |
| |
| #[tokio::test] |
| async fn test_range_after_header() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let meta = store.head(&location).await.unwrap(); |
| |
| let range = 100..meta.size; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert!(batch.num_rows() > 0); |
| } |
| |
| #[tokio::test] |
| async fn test_range_no_sync_marker() { |
| // Small range unlikely to contain sync marker |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let range = 50..150; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| assert_eq!(batches.len(), 0); |
| } |
| |
| #[tokio::test] |
| async fn test_range_starting_mid_file() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| |
| let range = 700..768; // Header ends at 675, so this should be mid-block |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| assert_eq!(batches.len(), 0); |
| } |
| |
| #[tokio::test] |
| async fn test_range_ending_at_file_size() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let meta = store.head(&location).await.unwrap(); |
| |
| let range = 200..meta.size; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| } |
| |
| #[tokio::test] |
| async fn test_incomplete_block_requires_fetch() { |
| // Range ends mid-block, should trigger fetching_rem_block logic |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let range = 0..1200; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8) |
| } |
| |
| #[tokio::test] |
| async fn test_partial_vlq_header_requires_fetch() { |
| // Range ends mid-VLQ header, triggering the Count|Size partial fetch logic. |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let range = 16..676; // Header should end at 675 |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8) |
| } |
| |
| #[cfg(feature = "snappy")] |
| #[tokio::test] |
| async fn test_snappy_compressed_with_range() { |
| { |
| let file = arrow_test_data("avro/alltypes_plain.snappy.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let meta = store.head(&location).await.unwrap(); |
| |
| let range = 200..meta.size; |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert!(batch.num_rows() > 0); |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_nulls() { |
| let file = arrow_test_data("avro/alltypes_nulls_plain.avro"); |
| let schema = get_alltypes_with_nulls_schema(); |
| let batches = read_async_file(&file, 1024, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 1); |
| for col in batch.columns() { |
| assert!(col.is_null(0)); |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_nested_records() { |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let schema = get_nested_records_schema(); |
| let batches = read_async_file(&file, 1024, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert!(batch.num_columns() > 0); |
| } |
| |
| #[tokio::test] |
| async fn test_stream_produces_multiple_batches() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema(); |
| let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap(); |
| let reader = AsyncAvroFileReader::builder( |
| file_reader, |
| file_size, |
| 2, // Small batch size to force multiple batches |
| ) |
| .with_reader_schema(reader_schema) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| |
| assert!(batches.len() > 1); |
| let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); |
| assert_eq!(total_rows, 8); |
| } |
| |
| #[tokio::test] |
| async fn test_stream_early_termination() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema(); |
| let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap(); |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1) |
| .with_reader_schema(reader_schema) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let first_batch = reader.take(1).try_collect::<Vec<_>>().await.unwrap(); |
| |
| assert_eq!(first_batch.len(), 1); |
| assert!(first_batch[0].num_rows() > 0); |
| } |
| |
| #[tokio::test] |
| async fn test_various_batch_sizes() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| |
| for batch_size in [1, 2, 3, 5, 7, 11, 100] { |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, batch_size, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| // Size should be what was provided, to the limit of the batch in the file |
| assert_eq!( |
| batch.num_rows(), |
| batch_size.min(8), |
| "Failed with batch_size={}", |
| batch_size |
| ); |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_range_larger_than_file() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let meta = store.head(&location).await.unwrap(); |
| |
| // Range extends beyond file size |
| let range = 100..(meta.size + 1000); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, Some(range), Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| // Should clamp to file size |
| assert_eq!(batch.num_rows(), 8); |
| } |
| |
| #[tokio::test] |
| async fn test_builder_with_header_info() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let mut file_reader = AvroObjectReader::new(store, location); |
| |
| let header_info = read_header_info(&mut file_reader, file_size, None) |
| .await |
| .unwrap(); |
| |
| assert_eq!(header_info.header_len(), 675); |
| |
| let writer_schema = header_info.writer_schema().unwrap(); |
| let expected_avro_json: serde_json::Value = serde_json::from_str( |
| get_alltypes_schema() |
| .metadata() |
| .get(SCHEMA_METADATA_KEY) |
| .unwrap(), |
| ) |
| .unwrap(); |
| let actual_avro_json: serde_json::Value = |
| serde_json::from_str(&writer_schema.json_string).unwrap(); |
| assert_eq!(actual_avro_json, expected_avro_json); |
| |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .build_with_header(header_info) |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| |
| let batch = &batches[0]; |
| assert_eq!(batch.num_rows(), 8) |
| } |
| |
| #[tokio::test] |
| async fn test_roundtrip_write_then_async_read() { |
| use crate::writer::AvroWriter; |
| use arrow_array::{Float64Array, StringArray}; |
| use std::fs::File; |
| use std::io::BufWriter; |
| use tempfile::tempdir; |
| |
| // Schema with nullable and non-nullable fields of various types |
| let schema = Arc::new(Schema::new(vec![ |
| Field::new("id", DataType::Int32, false), |
| Field::new("name", DataType::Utf8, true), |
| Field::new("score", DataType::Float64, true), |
| Field::new("count", DataType::Int64, false), |
| ])); |
| |
| let dir = tempdir().unwrap(); |
| let file_path = dir.path().join("roundtrip_test.avro"); |
| |
| // Write multiple batches with nulls |
| { |
| let file = File::create(&file_path).unwrap(); |
| let writer = BufWriter::new(file); |
| let mut avro_writer = AvroWriter::new(writer, schema.as_ref().clone()).unwrap(); |
| |
| // First batch: 3 rows with some nulls |
| let batch1 = RecordBatch::try_new( |
| schema.clone(), |
| vec![ |
| Arc::new(Int32Array::from(vec![1, 2, 3])), |
| Arc::new(StringArray::from(vec![ |
| Some("alice"), |
| None, |
| Some("charlie"), |
| ])), |
| Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), None])), |
| Arc::new(Int64Array::from(vec![10, 20, 30])), |
| ], |
| ) |
| .unwrap(); |
| avro_writer.write(&batch1).unwrap(); |
| |
| // Second batch: 2 rows |
| let batch2 = RecordBatch::try_new( |
| schema.clone(), |
| vec![ |
| Arc::new(Int32Array::from(vec![4, 5])), |
| Arc::new(StringArray::from(vec![Some("diana"), Some("eve")])), |
| Arc::new(Float64Array::from(vec![None, Some(88.0)])), |
| Arc::new(Int64Array::from(vec![40, 50])), |
| ], |
| ) |
| .unwrap(); |
| avro_writer.write(&batch2).unwrap(); |
| |
| avro_writer.finish().unwrap(); |
| } |
| |
| // Read back with small batch size to produce multiple output batches |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file_path).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| |
| // Verify we got multiple output batches due to small batch_size |
| assert!( |
| batches.len() > 1, |
| "Expected multiple batches with batch_size=2" |
| ); |
| |
| // Verify total row count |
| let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); |
| assert_eq!(total_rows, 5); |
| |
| // Concatenate all batches to verify data |
| let combined = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap(); |
| assert_eq!(combined.num_rows(), 5); |
| assert_eq!(combined.num_columns(), 4); |
| |
| // Check id column (non-nullable) |
| let id_array = combined |
| .column(0) |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap(); |
| assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]); |
| |
| // Check name column (nullable) - verify nulls are preserved |
| // Avro strings are read as Binary by default |
| let name_col = combined.column(1); |
| let name_array = name_col.as_string::<i32>(); |
| assert_eq!(name_array.value(0), "alice"); |
| assert!(name_col.is_null(1)); // second row has null name |
| assert_eq!(name_array.value(2), "charlie"); |
| |
| // Check score column (nullable) - verify nulls are preserved |
| let score_array = combined |
| .column(2) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert!(!score_array.is_null(0)); |
| assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON); |
| assert!(score_array.is_null(2)); // third row has null score |
| assert!(score_array.is_null(3)); // fourth row has null score |
| assert!(!score_array.is_null(4)); |
| assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON); |
| |
| // Check count column (non-nullable) |
| let count_array = combined |
| .column(3) |
| .as_any() |
| .downcast_ref::<Int64Array>() |
| .unwrap(); |
| assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]); |
| } |
| |
| #[tokio::test] |
| async fn test_alltypes_no_schema_no_projection() { |
| // No reader schema, no projection - uses writer schema from file |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let batches = read_async_file(&file, 1024, None, None, None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 11); |
| assert_eq!(batch.schema().field(0).name(), "id"); |
| } |
| |
| #[tokio::test] |
| async fn test_alltypes_no_schema_with_projection() { |
| // No reader schema, with projection - project writer schema |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| // Project [tinyint_col, id, bigint_col] = indices [2, 0, 5] |
| let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 5])) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 3); |
| assert_eq!(batch.schema().field(0).name(), "tinyint_col"); |
| assert_eq!(batch.schema().field(1).name(), "id"); |
| assert_eq!(batch.schema().field(2).name(), "bigint_col"); |
| |
| // Verify data values |
| let tinyint_col = batch.column(0).as_primitive::<Int32Type>(); |
| assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]); |
| |
| let id = batch.column(1).as_primitive::<Int32Type>(); |
| assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]); |
| |
| let bigint_col = batch.column(2).as_primitive::<Int64Type>(); |
| assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]); |
| } |
| |
| #[tokio::test] |
| async fn test_alltypes_with_schema_no_projection() { |
| // With reader schema, no projection |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| let batches = read_async_file(&file, 1024, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 11); |
| } |
| |
| #[tokio::test] |
| async fn test_alltypes_with_schema_with_projection() { |
| // With reader schema, with projection |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let schema = get_alltypes_schema(); |
| // Project [bool_col, id] = indices [1, 0] |
| let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![1, 0])) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 2); |
| assert_eq!(batch.schema().field(0).name(), "bool_col"); |
| assert_eq!(batch.schema().field(1).name(), "id"); |
| |
| let bool_col = batch.column(0).as_boolean(); |
| assert!(bool_col.value(0)); |
| assert!(!bool_col.value(1)); |
| |
| let id = batch.column(1).as_primitive::<Int32Type>(); |
| assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]); |
| } |
| |
| #[tokio::test] |
| async fn test_nested_no_schema_no_projection() { |
| // No reader schema, no projection |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let batches = read_async_file(&file, 1024, None, None, None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert_eq!(batch.num_columns(), 4); |
| assert_eq!(batch.schema().field(0).name(), "f1"); |
| assert_eq!(batch.schema().field(1).name(), "f2"); |
| assert_eq!(batch.schema().field(2).name(), "f3"); |
| assert_eq!(batch.schema().field(3).name(), "f4"); |
| } |
| |
| #[tokio::test] |
| async fn test_nested_no_schema_with_projection() { |
| // No reader schema, with projection - reorder nested fields |
| let file = arrow_test_data("avro/nested_records.avro"); |
| // Project [f3, f1] = indices [2, 0] |
| let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0])) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert_eq!(batch.num_columns(), 2); |
| assert_eq!(batch.schema().field(0).name(), "f3"); |
| assert_eq!(batch.schema().field(1).name(), "f1"); |
| } |
| |
| #[tokio::test] |
| async fn test_nested_with_schema_no_projection() { |
| // With reader schema, no projection |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let schema = get_nested_records_schema(); |
| let batches = read_async_file(&file, 1024, None, Some(schema), None) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert_eq!(batch.num_columns(), 4); |
| } |
| |
| #[tokio::test] |
| async fn test_nested_with_schema_with_projection() { |
| // With reader schema, with projection |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let schema = get_nested_records_schema(); |
| // Project [f4, f2, f1] = indices [3, 1, 0] |
| let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![3, 1, 0])) |
| .await |
| .unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| assert_eq!(batch.num_columns(), 3); |
| assert_eq!(batch.schema().field(0).name(), "f4"); |
| assert_eq!(batch.schema().field(1).name(), "f2"); |
| assert_eq!(batch.schema().field(2).name(), "f1"); |
| } |
| |
| #[tokio::test] |
| async fn test_projection_error_out_of_bounds() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| // Index 100 is out of bounds for the 11-field schema |
| let err = read_async_file(&file, 1024, None, None, Some(vec![100])) |
| .await |
| .unwrap_err(); |
| assert!(matches!(err, ArrowError::AvroError(_))); |
| assert!(err.to_string().contains("out of bounds")); |
| } |
| |
| #[tokio::test] |
| async fn test_projection_error_duplicate_index() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| // Duplicate index 0 |
| let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0])) |
| .await |
| .unwrap_err(); |
| assert!(matches!(err, ArrowError::AvroError(_))); |
| assert!(err.to_string().contains("Duplicate projection index")); |
| } |
| |
| #[tokio::test] |
| async fn test_arrow_schema_from_reader_no_reader_schema() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let expected_schema = get_alltypes_schema() |
| .as_ref() |
| .clone() |
| .with_metadata(Default::default()); |
| |
| // Build reader without providing reader schema - should use writer schema from file |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| assert_eq!(reader.schema().as_ref(), &expected_schema); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.schema().as_ref(), &expected_schema); |
| } |
| |
| #[tokio::test] |
| async fn test_arrow_schema_from_reader_with_reader_schema() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema() |
| .project(&[0, 1, 7]) |
| .unwrap() |
| .with_metadata(Default::default()); |
| let reader_schema = AvroSchema::try_from(&schema).unwrap(); |
| let expected_schema = schema.clone(); |
| |
| // Build reader with provided reader schema - must apply the projection |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_reader_schema(reader_schema) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| assert_eq!(reader.schema().as_ref(), &expected_schema); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.schema().as_ref(), &expected_schema); |
| } |
| |
| #[tokio::test] |
| async fn test_arrow_schema_from_reader_nested_records() { |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| // The schema produced by the reader should match the expected schema, |
| // attaching Avro type name metadata to fields of record and list types. |
| let expected_schema = get_nested_records_schema() |
| .as_ref() |
| .clone() |
| .with_metadata(Default::default()); |
| |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| assert_eq!(reader.schema().as_ref(), &expected_schema); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.schema().as_ref(), &expected_schema); |
| } |
| |
| #[tokio::test] |
| async fn test_with_header_size_hint_small() { |
| // Use a very small header size hint to force multiple fetches |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema(); |
| let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap(); |
| |
| // Use a tiny header hint (64 bytes) - header is much larger |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_reader_schema(reader_schema) |
| .with_header_size_hint(64) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 11); |
| } |
| |
| #[tokio::test] |
| async fn test_with_header_size_hint_large() { |
| // Use a larger header size hint than needed |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema(); |
| let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap(); |
| |
| // Use a large header hint (64KB) |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_reader_schema(reader_schema) |
| .with_header_size_hint(64 * 1024) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 8); |
| assert_eq!(batch.num_columns(), 11); |
| } |
| |
| #[tokio::test] |
| async fn test_with_tz_utc() { |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| let schema = get_alltypes_schema_with_tz("UTC"); |
| let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap(); |
| |
| // Specify the time zone ID of "UTC" for timestamp fields with time zone. |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_reader_schema(reader_schema) |
| .with_tz(Tz::Utc) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_columns(), 11); |
| |
| let schema = batch.schema(); |
| let ts_field = schema.field_with_name("timestamp_col").unwrap(); |
| assert!( |
| matches!( |
| ts_field.data_type(), |
| DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC" |
| ), |
| "expected Timestamp(Microsecond, Some(\"UTC\")), got {:?}", |
| ts_field.data_type() |
| ); |
| } |
| |
| #[tokio::test] |
| async fn test_with_utf8_view_enabled() { |
| // Test that utf8_view produces StringViewArray instead of StringArray |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_utf8_view(true) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| |
| // The f1 struct contains f1_1 which is a string field |
| // With utf8_view enabled, it should be Utf8View type |
| let f1_col = batch.column(0); |
| let f1_struct = f1_col.as_struct(); |
| let f1_1_field = f1_struct.column_by_name("f1_1").unwrap(); |
| |
| // Check that the data type is Utf8View |
| assert_eq!(f1_1_field.data_type(), &DataType::Utf8View); |
| } |
| |
| #[tokio::test] |
| async fn test_with_utf8_view_disabled() { |
| // Test that without utf8_view, we get regular Utf8 |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_utf8_view(false) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| assert_eq!(batch.num_rows(), 2); |
| |
| // The f1 struct contains f1_1 which is a string field |
| // Without utf8_view, it should be regular Utf8 |
| let f1_col = batch.column(0); |
| let f1_struct = f1_col.as_struct(); |
| let f1_1_field = f1_struct.column_by_name("f1_1").unwrap(); |
| |
| assert_eq!(f1_1_field.data_type(), &DataType::Utf8); |
| } |
| |
| #[tokio::test] |
| async fn test_with_strict_mode_disabled_allows_null_second() { |
| // Test that with strict_mode disabled, unions of ['T', 'null'] are allowed |
| // The alltypes_nulls_plain.avro file has unions with null second |
| let file = arrow_test_data("avro/alltypes_nulls_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| // Without strict mode, this should succeed |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_strict_mode(false) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| assert_eq!(batches.len(), 1); |
| assert_eq!(batches[0].num_rows(), 1); |
| } |
| |
| #[tokio::test] |
| async fn test_with_strict_mode_enabled_rejects_null_second() { |
| // Test that with strict_mode enabled, unions of ['T', 'null'] are rejected |
| // The alltypes_plain.avro file has unions like ["int", "null"] (null second) |
| let file = arrow_test_data("avro/alltypes_plain.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| // With strict mode, this should fail because of ['T', 'null'] unions |
| let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_strict_mode(true) |
| .try_build() |
| .await; |
| |
| match result { |
| Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] union"), |
| Err(err) => { |
| assert!( |
| err.to_string().contains("disallowed in strict_mode"), |
| "Expected strict_mode error, got: {}", |
| err |
| ); |
| } |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_with_strict_mode_enabled_valid_schema() { |
| // Test that strict_mode works with schemas that have proper ['null', 'T'] unions |
| // The nested_records.avro file has properly ordered unions |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| // With strict mode, properly ordered unions should still work |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) |
| .with_strict_mode(true) |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| assert_eq!(batches.len(), 1); |
| assert_eq!(batches[0].num_rows(), 2); |
| } |
| |
| #[tokio::test] |
| async fn test_builder_options_combined() { |
| // Test combining multiple builder options |
| let file = arrow_test_data("avro/nested_records.avro"); |
| let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); |
| let location = Path::from_filesystem_path(&file).unwrap(); |
| let file_size = store.head(&location).await.unwrap().size; |
| |
| let file_reader = AvroObjectReader::new(store, location); |
| |
| let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2) |
| .with_header_size_hint(128) |
| .with_utf8_view(true) |
| .with_strict_mode(true) |
| .with_projection(vec![0, 2]) // f1 and f3 |
| .try_build() |
| .await |
| .unwrap(); |
| |
| let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap(); |
| let batch = &batches[0]; |
| |
| // Should have 2 columns (f1 and f3) due to projection |
| assert_eq!(batch.num_columns(), 2); |
| assert_eq!(batch.schema().field(0).name(), "f1"); |
| assert_eq!(batch.schema().field(1).name(), "f3"); |
| |
| // Verify utf8_view is applied |
| let f1_col = batch.column(0); |
| let f1_struct = f1_col.as_struct(); |
| let f1_1_field = f1_struct.column_by_name("f1_1").unwrap(); |
| assert_eq!(f1_1_field.data_type(), &DataType::Utf8View); |
| } |
| } |