blob: 44e99ac0a77998253befc80ad2a32f6285836513 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{cell::RefCell, cmp, fmt, io::*};
use crate::file::{reader::Length, writer::ParquetWriter};
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
// ----------------------------------------------------------------------
/// TryClone tries to clone the type and should maintain the `Seek` position of the given
/// instance.
pub trait TryClone: Sized {
/// Clones the type returning a new instance or an error if it's not possible
/// to clone it.
fn try_clone(&self) -> Result<Self>;
/// ParquetReader is the interface which needs to be fulfilled to be able to parse a
/// parquet source.
pub trait ParquetReader: Read + Seek + Length + TryClone {}
impl<T: Read + Seek + Length + TryClone> ParquetReader for T {}
// Read/Write wrappers for `File`.
/// Position trait returns the current position in the stream.
/// Should be viewed as a lighter version of `Seek` that does not allow seek operations,
/// and does not require mutable reference for the current position.
pub trait Position {
/// Returns position in the stream.
fn pos(&self) -> u64;
/// Struct that represents a slice of a file data with independent start position and
/// length. Internally clones provided file handle, wraps with a custom implementation
/// of BufReader that resets position before any read.
/// This is workaround and alternative for `file.try_clone()` method. It clones `File`
/// while preserving independent position, which is not available with `try_clone()`.
/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
reader: RefCell<R>,
start: u64, // start position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
buf_pos: usize, // current position of the reader in the buffer
buf_cap: usize, // current number of bytes read into the buffer
impl<R: ParquetReader> fmt::Debug for FileSource<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
.field("reader", &"OPAQUE")
.field("start", &self.start)
.field("end", &self.end)
.field("buf.len", &self.buf.len())
.field("buf_pos", &self.buf_pos)
.field("buf_cap", &self.buf_cap)
impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
Self {
end: start + length as u64,
buf: vec![0_u8; DEFAULT_BUF_SIZE],
buf_pos: 0,
buf_cap: 0,
fn fill_inner_buf(&mut self) -> Result<&[u8]> {
if self.buf_pos >= self.buf_cap {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
debug_assert!(self.buf_pos == self.buf_cap);
let mut reader = self.reader.borrow_mut();; // always seek to start before reading
self.buf_cap = self.buf)?;
self.buf_pos = 0;
fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result<usize> {
// discard buffer
self.buf_pos = 0;
self.buf_cap = 0;
// read directly into param buffer
let mut reader = self.reader.borrow_mut();; // always seek to start before reading
let nread =;
self.start += nread as u64;
impl<R: ParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() {
return self.skip_inner_buf(buf);
let nread = {
let mut rem = self.fill_inner_buf()?;
// copy the data from the inner buffer to the param buffer
// consume from buffer
self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap);
self.start += nread as u64;
impl<R: ParquetReader> Position for FileSource<R> {
fn pos(&self) -> u64 {
impl<R: ParquetReader> Length for FileSource<R> {
fn len(&self) -> u64 {
self.end - self.start
/// Struct that represents `File` output stream with position tracking.
/// Used as a sink in file writer.
pub struct FileSink<W: ParquetWriter> {
buf: BufWriter<W>,
// This is not necessarily position in the underlying file,
// but rather current position in the sink.
pos: u64,
impl<W: ParquetWriter> FileSink<W> {
/// Creates new file sink.
/// Position is set to whatever position file has.
pub fn new(buf: &W) -> Self {
let mut owned_buf = buf.try_clone().unwrap();
let pos =;
Self {
buf: BufWriter::new(owned_buf),
impl<W: ParquetWriter> Write for FileSink<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
let num_bytes = self.buf.write(buf)?;
self.pos += num_bytes as u64;
fn flush(&mut self) -> Result<()> {
impl<W: ParquetWriter> Position for FileSink<W> {
fn pos(&self) -> u64 {
// Position implementation for Cursor to use in various tests.
impl<'a> Position for Cursor<&'a mut Vec<u8>> {
fn pos(&self) -> u64 {
mod tests {
use super::*;
use std::iter;
use crate::util::test_common::{get_temp_file, get_test_file};
fn test_io_read_fully() {
let mut buf = vec![0; 8];
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let bytes_read = buf[..]).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1', 0, 0, 0, 0]);
fn test_io_read_in_chunks() {
let mut buf = vec![0; 4];
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let bytes_read = buf[0..2]).unwrap();
assert_eq!(bytes_read, 2);
let bytes_read = buf[2..]).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1']);
fn test_io_read_pos() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let _ = [0; 1]).unwrap();
assert_eq!(src.pos(), 1);
let _ = [0; 4]).unwrap();
assert_eq!(src.pos(), 4);
fn test_io_read_over_limit() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
// Read all bytes from source
let _ = [0; 128]).unwrap();
assert_eq!(src.pos(), 4);
// Try reading again, should return 0 bytes.
let bytes_read = [0; 128]).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(src.pos(), 4);
fn test_io_seek_switch() {
let mut buf = vec![0; 4];
let mut file = get_test_file("alltypes_plain.parquet");
let mut src = FileSource::new(&file, 0, 4);
.expect("File seek to a position");
let bytes_read = buf[..]).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1']);
fn test_io_write_with_pos() {
let mut file = get_temp_file("file_sink_test", &[b'a', b'b', b'c']);;
// Write into sink
let mut sink = FileSink::new(&file);
assert_eq!(sink.pos(), 3);
sink.write_all(&[b'd', b'e', b'f', b'g']).unwrap();
assert_eq!(sink.pos(), 7);
// Read data using file chunk
let mut res = vec![0u8; 7];
let mut chunk =
FileSource::new(&file, 0, file.metadata().unwrap().len() as usize);
chunk.read_exact(&mut res[..]).unwrap();
assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']);
fn test_io_large_read() {
// Generate repeated 'abcdef' pattern and write it into a file
let patterned_data: Vec<u8> = iter::repeat(vec![0, 1, 2, 3, 4, 5])
// always use different temp files as test might be run in parallel
let mut file = get_temp_file("large_file_sink_test", &patterned_data);
// seek the underlying file to the first 'd';
// create the FileSource reader that starts at pos 1 ('b')
let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1);
// read the 'b' at pos 1
let mut res = vec![0u8; 1];
chunk.read_exact(&mut res).unwrap();
assert_eq!(res, &[1]);
// the underlying file is sought to 'e';
// now read large chunk that starts with 'c' (after 'b')
let mut res = vec![0u8; 2 * DEFAULT_BUF_SIZE];
chunk.read_exact(&mut res).unwrap();
&patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE],
"read buf and original data are not equal"