blob: 9f9fc0d94bb1c88ebee3f4508b36d330e6285d1e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Module containing helper methods for the various file formats
//! See write.rs for write related helper methods
use ::arrow::array::RecordBatch;
use arrow::error::ArrowError;
use bytes::Buf;
use bytes::Bytes;
use datafusion_common::Result;
use futures::StreamExt as _;
use futures::stream::BoxStream;
use futures::{Stream, ready};
use std::collections::VecDeque;
use std::fmt;
use std::task::Poll;
/// Possible outputs of a [`BatchDeserializer`].
#[derive(Debug, PartialEq)]
pub enum DeserializerOutput {
/// A successfully deserialized [`RecordBatch`].
RecordBatch(RecordBatch),
/// The deserializer requires more data to make progress.
RequiresMoreData,
/// The input data has been exhausted.
InputExhausted,
}
/// Trait defining a scheme for deserializing byte streams into structured data.
/// Implementors of this trait are responsible for converting raw bytes into
/// `RecordBatch` objects.
pub trait BatchDeserializer<T>: Send + fmt::Debug {
/// Feeds a message for deserialization, updating the internal state of
/// this `BatchDeserializer`. Note that one can call this function multiple
/// times before calling `next`, which will queue multiple messages for
/// deserialization. Returns the number of bytes consumed.
fn digest(&mut self, message: T) -> usize;
/// Attempts to deserialize any pending messages and returns a
/// `DeserializerOutput` to indicate progress.
fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
/// Informs the deserializer that no more messages will be provided for
/// deserialization.
fn finish(&mut self);
}
/// A general interface for decoders such as [`arrow::json::reader::Decoder`] and
/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
/// a method to check if the decoder can flush early. Intended to be used in
/// conjunction with [`DecoderDeserializer`].
///
/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
pub trait Decoder: Send + fmt::Debug {
/// See [`arrow::json::reader::Decoder::decode`].
///
/// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode
fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
/// See [`arrow::json::reader::Decoder::flush`].
///
/// [`arrow::json::reader::Decoder::flush`]: ::arrow::json::reader::Decoder::flush
fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
/// Whether the decoder can flush early in its current state.
fn can_flush_early(&self) -> bool;
}
impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Deserializer")
.field("buffered_queue", &self.buffered_queue)
.field("finalized", &self.finalized)
.finish()
}
}
impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
fn digest(&mut self, message: Bytes) -> usize {
if message.is_empty() {
return 0;
}
let consumed = message.len();
self.buffered_queue.push_back(message);
consumed
}
fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
while let Some(buffered) = self.buffered_queue.front_mut() {
let decoded = self.decoder.decode(buffered)?;
buffered.advance(decoded);
if buffered.is_empty() {
self.buffered_queue.pop_front();
}
// Flush when the stream ends or batch size is reached
// Certain implementations can flush early
if decoded == 0 || self.decoder.can_flush_early() {
return match self.decoder.flush() {
Ok(Some(batch)) => Ok(DeserializerOutput::RecordBatch(batch)),
Ok(None) => continue,
Err(e) => Err(e),
};
}
}
if self.finalized {
Ok(DeserializerOutput::InputExhausted)
} else {
Ok(DeserializerOutput::RequiresMoreData)
}
}
fn finish(&mut self) {
self.finalized = true;
// Ensure the decoder is flushed:
self.buffered_queue.push_back(Bytes::new());
}
}
/// A generic, decoder-based deserialization scheme for processing encoded data.
///
/// This struct is responsible for converting a stream of bytes, which represent
/// encoded data, into a stream of `RecordBatch` objects, following the specified
/// schema and formatting options. It also handles any buffering necessary to satisfy
/// the `Decoder` interface.
pub struct DecoderDeserializer<T: Decoder> {
/// The underlying decoder used for deserialization
pub(crate) decoder: T,
/// The buffer used to store the remaining bytes to be decoded
pub(crate) buffered_queue: VecDeque<Bytes>,
/// Whether the input stream has been fully consumed
pub(crate) finalized: bool,
}
impl<T: Decoder> DecoderDeserializer<T> {
/// Creates a new `DecoderDeserializer` with the provided decoder.
pub fn new(decoder: T) -> Self {
DecoderDeserializer {
decoder,
buffered_queue: VecDeque::new(),
finalized: false,
}
}
}
/// Deserializes a stream of bytes into a stream of [`RecordBatch`] objects using the
/// provided deserializer.
///
/// Returns a boxed stream of `Result<RecordBatch, ArrowError>`. The stream yields [`RecordBatch`]
/// objects as they are produced by the deserializer, or an [`ArrowError`] if an error
/// occurs while polling the input or deserializing.
pub fn deserialize_stream<'a>(
mut input: impl Stream<Item = Result<Bytes>> + Unpin + Send + 'a,
mut deserializer: impl BatchDeserializer<Bytes> + 'a,
) -> BoxStream<'a, Result<RecordBatch, ArrowError>> {
futures::stream::poll_fn(move |cx| {
loop {
match ready!(input.poll_next_unpin(cx)).transpose()? {
Some(b) => _ = deserializer.digest(b),
None => deserializer.finish(),
};
return match deserializer.next()? {
DeserializerOutput::RecordBatch(rb) => Poll::Ready(Some(Ok(rb))),
DeserializerOutput::InputExhausted => Poll::Ready(None),
DeserializerOutput::RequiresMoreData => continue,
};
}
})
.boxed()
}