blob: d3cca70425de3d934799a2a8d413ccec8c8930e2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::codec::{AvroFieldBuilder, Tz};
use crate::errors::AvroError;
use crate::reader::async_reader::ReaderState;
use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
use crate::reader::record::RecordDecoder;
use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
use crate::schema::{AvroSchema, FingerprintAlgorithm};
use indexmap::IndexMap;
use std::ops::Range;
const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
/// Builder for an asynchronous Avro file reader.
pub struct ReaderBuilder<R> {
reader: R,
file_size: u64,
batch_size: usize,
range: Option<Range<u64>>,
reader_schema: Option<AvroSchema>,
projection: Option<Vec<usize>>,
header_size_hint: Option<u64>,
utf8_view: bool,
strict_mode: bool,
tz: Tz,
}
impl<R> ReaderBuilder<R> {
pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
Self {
reader,
file_size,
batch_size,
range: None,
reader_schema: None,
projection: None,
header_size_hint: None,
utf8_view: false,
strict_mode: false,
tz: Default::default(),
}
}
/// Specify a byte range to read from the Avro file.
/// If this is provided, the reader will read all the blocks within the specified range,
/// if the range ends mid-block, it will attempt to fetch the remaining bytes to complete the block,
/// but no further blocks will be read.
/// If this is omitted, the full file will be read.
pub fn with_range(self, range: Range<u64>) -> Self {
Self {
range: Some(range),
..self
}
}
/// Specify a reader schema to use when reading the Avro file.
/// This can be useful to project specific columns or handle schema evolution.
/// If this is not provided, the schema will be derived from the Arrow schema provided.
pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
Self {
reader_schema: Some(reader_schema),
..self
}
}
/// Specify a projection of column indices to read from the Avro file.
/// This can help optimize reading by only fetching the necessary columns.
pub fn with_projection(self, projection: Vec<usize>) -> Self {
Self {
projection: Some(projection),
..self
}
}
/// Provide a hint for the expected size of the Avro header in bytes.
/// This can help optimize the initial read operation when fetching the header.
pub fn with_header_size_hint(self, hint: u64) -> Self {
Self {
header_size_hint: Some(hint),
..self
}
}
/// Enable usage of Utf8View types when reading string data.
pub fn with_utf8_view(self, utf8_view: bool) -> Self {
Self { utf8_view, ..self }
}
/// Enable strict mode for schema validation and data reading.
pub fn with_strict_mode(self, strict_mode: bool) -> Self {
Self {
strict_mode,
..self
}
}
/// Sets the timezone representation for Avro timestamp fields.
///
/// The default is `Tz::OffsetZero`, meaning the "+00:00" time zone ID.
pub fn with_tz(mut self, tz: Tz) -> Self {
self.tz = tz;
self
}
}
/// Reads the Avro file header (magic, metadata, sync marker) asynchronously from `reader`.
///
/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes.
pub async fn read_header_info<R>(
reader: &mut R,
file_size: u64,
header_size_hint: Option<u64>,
) -> Result<HeaderInfo, AvroError>
where
R: AsyncFileReader,
{
read_header(reader, file_size, header_size_hint)
.await
.map(|(header, header_len)| HeaderInfo::new(header, header_len))
}
async fn read_header<R>(
reader: &mut R,
file_size: u64,
header_size_hint: Option<u64>,
) -> Result<(Header, u64), AvroError>
where
R: AsyncFileReader,
{
let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
let range_to_fetch = position
..(position + header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);
// Maybe EOF after the header, no actual data
if range_to_fetch.is_empty() {
break;
}
let current_data = reader
.get_bytes(range_to_fetch.clone())
.await
.map_err(|err| {
AvroError::General(format!(
"Error fetching Avro header from file reader: {err}"
))
})?;
if current_data.is_empty() {
return Err(AvroError::EOF(
"Unexpected EOF while fetching header data".into(),
));
}
let read = current_data.len();
let decoded = decoder.decode(&current_data)?;
if decoded != read {
position += decoded as u64;
break;
}
position += read as u64;
}
decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
}
impl<R: AsyncFileReader> ReaderBuilder<R> {
/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
if self.file_size == 0 {
return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
}
// Start by reading the header from the beginning of the avro file
// take the writer schema from the header
let header_info =
read_header_info(&mut self.reader, self.file_size, self.header_size_hint).await?;
self.build_with_header(header_info)
}
/// Build the asynchronous Avro reader with the provided header.
///
/// This allows initializing the reader with pre-parsed header information.
/// Note that this method is not async because it does not need to perform any I/O operations.
///
/// Note: Any `header_size_hint` set via [`Self::with_header_size_hint`] is not used
/// when building with a pre-parsed header, since no header fetching occurs.
pub fn build_with_header(
self,
header_info: HeaderInfo,
) -> Result<AsyncAvroFileReader<R>, AvroError> {
let writer_schema = header_info.writer_schema()?;
// If projection exists, project the reader schema,
// if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
// this projected schema will be the schema used for reading.
let projected_reader_schema = self
.projection
.as_deref()
.map(|projection| {
let base_schema = if let Some(reader_schema) = &self.reader_schema {
reader_schema
} else {
&writer_schema
};
base_schema.project(projection)
})
.transpose()?;
// Use either the projected reader schema or the original reader schema(if no projection)
// (both optional, at worst no reader schema is provided, in which case we read with the writer schema)
let effective_reader_schema = projected_reader_schema
.as_ref()
.or(self.reader_schema.as_ref())
.map(|s| s.schema())
.transpose()?;
let root = {
let writer_schema = writer_schema.schema()?;
let mut builder = AvroFieldBuilder::new(&writer_schema);
if let Some(reader_schema) = &effective_reader_schema {
builder = builder.with_reader_schema(reader_schema);
}
builder
.with_utf8view(self.utf8_view)
.with_strict_mode(self.strict_mode)
.with_tz(self.tz)
.build()
}?;
let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
let decoder = Decoder::from_parts(
self.batch_size,
record_decoder,
None,
IndexMap::new(),
FingerprintAlgorithm::Rabin,
);
let header_len = header_info.header_len();
let range = match self.range {
Some(r) => {
// If this PartitionedFile's range starts at 0, we need to skip the header bytes.
// But then we need to seek back 16 bytes to include the sync marker for the first block,
// as the logic in this reader searches the data for the first sync marker(after which a block starts),
// then reads blocks from the count, size etc.
let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
start..end
}
None => 0..self.file_size,
};
// Determine if there is actually data to fetch, note that we subtract the header len from range.start,
// so we need to check if range.end == header_len to see if there's no data after the header
let reader_state = if range.start == range.end || header_len == range.end {
ReaderState::Finished
} else {
ReaderState::Idle {
reader: self.reader,
}
};
let codec = header_info.compression()?;
let sync_marker = header_info.sync();
Ok(AsyncAvroFileReader::new(
range,
self.file_size,
decoder,
codec,
sync_marker,
reader_state,
))
}
}