ARROW-7659: [Rust] Reduce Rc usage
Closes #6263 from 95th/master and squashes the following commits:
4d4707b18 <Gurwinder Singh> use ref instead of Rc bump
0166584b7 <Gurwinder Singh> Replace ColumnChunkMetaDataPtr
9db1f0971 <Gurwinder Singh> Reduce Rc usage in Parquet
Authored-by: Gurwinder Singh <vargwin@gmail.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
index 2e06739..911b707 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -100,7 +100,7 @@
.file_reader
.metadata()
.file_metadata()
- .schema_descr_ptr()
+ .schema_descr()
.num_columns();
self.get_record_reader_by_columns(column_indices, batch_size)
diff --git a/rust/parquet/src/file/metadata.rs b/rust/parquet/src/file/metadata.rs
index 1f40e68..ab10b32 100644
--- a/rust/parquet/src/file/metadata.rs
+++ b/rust/parquet/src/file/metadata.rs
@@ -45,31 +45,25 @@
Type as SchemaType, TypePtr,
};
-/// Reference counted pointer for [`ParquetMetaData`].
-pub type ParquetMetaDataPtr = Rc<ParquetMetaData>;
-
/// Global Parquet metadata.
pub struct ParquetMetaData {
- file_metadata: FileMetaDataPtr,
- row_groups: Vec<RowGroupMetaDataPtr>,
+ file_metadata: FileMetaData,
+ row_groups: Vec<RowGroupMetaData>,
}
impl ParquetMetaData {
/// Creates Parquet metadata from file metadata and a list of row group metadata `Rc`s
/// for each available row group.
- pub fn new(
- file_metadata: FileMetaData,
- row_group_ptrs: Vec<RowGroupMetaDataPtr>,
- ) -> Self {
+ pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
ParquetMetaData {
- file_metadata: Rc::new(file_metadata),
- row_groups: row_group_ptrs,
+ file_metadata,
+ row_groups,
}
}
- /// Returns file metadata as reference counted clone.
- pub fn file_metadata(&self) -> FileMetaDataPtr {
- self.file_metadata.clone()
+ /// Returns file metadata as reference.
+ pub fn file_metadata(&self) -> &FileMetaData {
+ &self.file_metadata
}
/// Returns number of row groups in this file.
@@ -79,13 +73,13 @@
/// Returns row group metadata for `i`th position.
/// Position should be less than number of row groups `num_row_groups`.
- pub fn row_group(&self, i: usize) -> RowGroupMetaDataPtr {
- self.row_groups[i].clone()
+ pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
+ &self.row_groups[i]
}
- /// Returns slice of row group reference counted pointers in this file.
- pub fn row_groups(&self) -> &[RowGroupMetaDataPtr] {
- &self.row_groups.as_slice()
+ /// Returns slice of row groups in this file.
+ pub fn row_groups(&self) -> &[RowGroupMetaData] {
+ &self.row_groups
}
}
@@ -185,7 +179,7 @@
/// Metadata for a row group.
pub struct RowGroupMetaData {
- columns: Vec<ColumnChunkMetaDataPtr>,
+ columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
@@ -207,8 +201,8 @@
&self.columns[i]
}
- /// Returns slice of column chunk metadata [`Rc`] pointers.
- pub fn columns(&self) -> &[ColumnChunkMetaDataPtr] {
+ /// Returns slice of column chunk metadata.
+ pub fn columns(&self) -> &[ColumnChunkMetaData] {
&self.columns
}
@@ -243,7 +237,7 @@
let mut columns = vec![];
for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
- columns.push(Rc::new(cc));
+ columns.push(cc);
}
Ok(RowGroupMetaData {
columns,
@@ -266,7 +260,7 @@
/// Builder for row group metadata.
pub struct RowGroupMetaDataBuilder {
- columns: Vec<ColumnChunkMetaDataPtr>,
+ columns: Vec<ColumnChunkMetaData>,
schema_descr: SchemaDescPtr,
num_rows: i64,
total_byte_size: i64,
@@ -296,7 +290,7 @@
}
/// Sets column metadata for this row group.
- pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaDataPtr>) -> Self {
+ pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.columns = value;
self
}
@@ -320,9 +314,6 @@
}
}
-/// Reference counted pointer for [`ColumnChunkMetaData`].
-pub type ColumnChunkMetaDataPtr = Rc<ColumnChunkMetaData>;
-
/// Metadata for a column chunk.
pub struct ColumnChunkMetaData {
column_type: Type,
@@ -642,7 +633,7 @@
let mut columns = vec![];
for ptr in schema_descr.columns() {
let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
- columns.push(Rc::new(column));
+ columns.push(column);
}
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs
index 123fd84..f2881a3 100644
--- a/rust/parquet/src/file/reader.rs
+++ b/rust/parquet/src/file/reader.rs
@@ -55,13 +55,13 @@
/// Parquet file, can get reader for each row group, and access record iterator.
pub trait FileReader {
/// Get metadata information about this file.
- fn metadata(&self) -> ParquetMetaDataPtr;
+ fn metadata(&self) -> &ParquetMetaData;
/// Get the total number of row groups for this file.
fn num_row_groups(&self) -> usize;
/// Get the `i`th row group reader. Note this doesn't do bound check.
- fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>>;
+ fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>>;
/// Get full iterator of `Row`s from a file (over all row groups).
///
@@ -76,7 +76,7 @@
/// row group, as well as readers for each individual column chunk.
pub trait RowGroupReader {
/// Get metadata information about this row group.
- fn metadata(&self) -> RowGroupMetaDataPtr;
+ fn metadata(&self) -> &RowGroupMetaData;
/// Get the total number of column chunks in this row group.
fn num_columns(&self) -> usize;
@@ -144,7 +144,7 @@
/// A serialized implementation for Parquet [`FileReader`].
pub struct SerializedFileReader<R: ParquetReader> {
buf: BufReader<R>,
- metadata: ParquetMetaDataPtr,
+ metadata: ParquetMetaData,
}
impl<R: ParquetReader> SerializedFileReader<R> {
@@ -153,10 +153,7 @@
pub fn new(reader: R) -> Result<Self> {
let mut buf = BufReader::new(reader);
let metadata = Self::parse_metadata(&mut buf)?;
- Ok(Self {
- buf,
- metadata: Rc::new(metadata),
- })
+ Ok(Self { buf, metadata })
}
// Layout of Parquet file
@@ -205,10 +202,7 @@
let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone()));
let mut row_groups = Vec::new();
for rg in t_file_metadata.row_groups {
- row_groups.push(Rc::new(RowGroupMetaData::from_thrift(
- schema_descr.clone(),
- rg,
- )?));
+ row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
}
let column_orders =
Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
@@ -258,15 +252,15 @@
}
impl<R: 'static + ParquetReader> FileReader for SerializedFileReader<R> {
- fn metadata(&self) -> ParquetMetaDataPtr {
- self.metadata.clone()
+ fn metadata(&self) -> &ParquetMetaData {
+ &self.metadata
}
fn num_row_groups(&self) -> usize {
self.metadata.num_row_groups()
}
- fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>> {
+ fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>> {
let row_group_metadata = self.metadata.row_group(i);
// Row groups should be processed sequentially.
let f = self.buf.get_ref().try_clone()?;
@@ -326,22 +320,22 @@
}
/// A serialized implementation for Parquet [`RowGroupReader`].
-pub struct SerializedRowGroupReader<R: ParquetReader> {
+pub struct SerializedRowGroupReader<'a, R: ParquetReader> {
buf: BufReader<R>,
- metadata: RowGroupMetaDataPtr,
+ metadata: &'a RowGroupMetaData,
}
-impl<R: 'static + ParquetReader> SerializedRowGroupReader<R> {
+impl<'a, R: 'static + ParquetReader> SerializedRowGroupReader<'a, R> {
/// Creates new row group reader from a file and row group metadata.
- fn new(file: R, metadata: RowGroupMetaDataPtr) -> Self {
+ fn new(file: R, metadata: &'a RowGroupMetaData) -> Self {
let buf = BufReader::new(file);
Self { buf, metadata }
}
}
-impl<R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<R> {
- fn metadata(&self) -> RowGroupMetaDataPtr {
- self.metadata.clone()
+impl<'a, R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<'a, R> {
+ fn metadata(&self) -> &RowGroupMetaData {
+ &self.metadata
}
fn num_columns(&self) -> usize {
@@ -602,7 +596,7 @@
let num_columns = file_reader
.metadata()
.file_metadata()
- .schema_descr_ptr()
+ .schema_descr()
.num_columns();
if column_index >= num_columns {
diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs
index 14a2703..f5105db 100644
--- a/rust/parquet/src/file/writer.rs
+++ b/rust/parquet/src/file/writer.rs
@@ -265,7 +265,7 @@
column_index: usize,
previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
- column_chunks: Vec<ColumnChunkMetaDataPtr>,
+ column_chunks: Vec<ColumnChunkMetaData>,
}
impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
@@ -303,7 +303,7 @@
// Update row group writer metrics
self.total_bytes_written += bytes_written;
- self.column_chunks.push(Rc::new(metadata));
+ self.column_chunks.push(metadata);
if let Some(rows) = self.total_rows_written {
if rows != rows_written {
return Err(general_err!(
@@ -372,8 +372,9 @@
if self.row_group_metadata.is_none() {
self.assert_previous_writer_closed()?;
+ let column_chunks = std::mem::replace(&mut self.column_chunks, vec![]);
let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
- .set_column_metadata(self.column_chunks.clone())
+ .set_column_metadata(column_chunks)
.set_total_byte_size(self.total_bytes_written as i64)
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
.build()?;
diff --git a/rust/parquet/src/record/triplet.rs b/rust/parquet/src/record/triplet.rs
index 9915b18..86c767f 100644
--- a/rust/parquet/src/record/triplet.rs
+++ b/rust/parquet/src/record/triplet.rs
@@ -518,8 +518,9 @@
) {
let file = get_test_file(file_name);
let file_reader = SerializedFileReader::new(file).unwrap();
+ let metadata = file_reader.metadata();
// Get schema descriptor
- let file_metadata = file_reader.metadata().file_metadata();
+ let file_metadata = metadata.file_metadata();
let schema = file_metadata.schema_descr();
// Get first row group
let row_group_reader = file_reader.get_row_group(0).unwrap();