ARROW-7659: [Rust] Reduce Rc usage

Closes #6263 from 95th/master and squashes the following commits:

4d4707b18 <Gurwinder Singh> use ref instead of Rc bump
0166584b7 <Gurwinder Singh> Replace ColumnChunkMetaDataPtr
9db1f0971 <Gurwinder Singh> Reduce Rc usage in Parquet

Authored-by: Gurwinder Singh <vargwin@gmail.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
index 2e06739..911b707 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -100,7 +100,7 @@
             .file_reader
             .metadata()
             .file_metadata()
-            .schema_descr_ptr()
+            .schema_descr()
             .num_columns();
 
         self.get_record_reader_by_columns(column_indices, batch_size)
diff --git a/rust/parquet/src/file/metadata.rs b/rust/parquet/src/file/metadata.rs
index 1f40e68..ab10b32 100644
--- a/rust/parquet/src/file/metadata.rs
+++ b/rust/parquet/src/file/metadata.rs
@@ -45,31 +45,25 @@
     Type as SchemaType, TypePtr,
 };
 
-/// Reference counted pointer for [`ParquetMetaData`].
-pub type ParquetMetaDataPtr = Rc<ParquetMetaData>;
-
 /// Global Parquet metadata.
 pub struct ParquetMetaData {
-    file_metadata: FileMetaDataPtr,
-    row_groups: Vec<RowGroupMetaDataPtr>,
+    file_metadata: FileMetaData,
+    row_groups: Vec<RowGroupMetaData>,
 }
 
 impl ParquetMetaData {
     /// Creates Parquet metadata from file metadata and a list of row group metadata `Rc`s
     /// for each available row group.
-    pub fn new(
-        file_metadata: FileMetaData,
-        row_group_ptrs: Vec<RowGroupMetaDataPtr>,
-    ) -> Self {
+    pub fn new(file_metadata: FileMetaData, row_groups: Vec<RowGroupMetaData>) -> Self {
         ParquetMetaData {
-            file_metadata: Rc::new(file_metadata),
-            row_groups: row_group_ptrs,
+            file_metadata,
+            row_groups,
         }
     }
 
-    /// Returns file metadata as reference counted clone.
-    pub fn file_metadata(&self) -> FileMetaDataPtr {
-        self.file_metadata.clone()
+    /// Returns file metadata as reference.
+    pub fn file_metadata(&self) -> &FileMetaData {
+        &self.file_metadata
     }
 
     /// Returns number of row groups in this file.
@@ -79,13 +73,13 @@
 
     /// Returns row group metadata for `i`th position.
     /// Position should be less than number of row groups `num_row_groups`.
-    pub fn row_group(&self, i: usize) -> RowGroupMetaDataPtr {
-        self.row_groups[i].clone()
+    pub fn row_group(&self, i: usize) -> &RowGroupMetaData {
+        &self.row_groups[i]
     }
 
-    /// Returns slice of row group reference counted pointers in this file.
-    pub fn row_groups(&self) -> &[RowGroupMetaDataPtr] {
-        &self.row_groups.as_slice()
+    /// Returns slice of row groups in this file.
+    pub fn row_groups(&self) -> &[RowGroupMetaData] {
+        &self.row_groups
     }
 }
 
@@ -185,7 +179,7 @@
 
 /// Metadata for a row group.
 pub struct RowGroupMetaData {
-    columns: Vec<ColumnChunkMetaDataPtr>,
+    columns: Vec<ColumnChunkMetaData>,
     num_rows: i64,
     total_byte_size: i64,
     schema_descr: SchemaDescPtr,
@@ -207,8 +201,8 @@
         &self.columns[i]
     }
 
-    /// Returns slice of column chunk metadata [`Rc`] pointers.
-    pub fn columns(&self) -> &[ColumnChunkMetaDataPtr] {
+    /// Returns slice of column chunk metadata.
+    pub fn columns(&self) -> &[ColumnChunkMetaData] {
         &self.columns
     }
 
@@ -243,7 +237,7 @@
         let mut columns = vec![];
         for (c, d) in rg.columns.drain(0..).zip(schema_descr.columns()) {
             let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
-            columns.push(Rc::new(cc));
+            columns.push(cc);
         }
         Ok(RowGroupMetaData {
             columns,
@@ -266,7 +260,7 @@
 
 /// Builder for row group metadata.
 pub struct RowGroupMetaDataBuilder {
-    columns: Vec<ColumnChunkMetaDataPtr>,
+    columns: Vec<ColumnChunkMetaData>,
     schema_descr: SchemaDescPtr,
     num_rows: i64,
     total_byte_size: i64,
@@ -296,7 +290,7 @@
     }
 
     /// Sets column metadata for this row group.
-    pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaDataPtr>) -> Self {
+    pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
         self.columns = value;
         self
     }
@@ -320,9 +314,6 @@
     }
 }
 
-/// Reference counted pointer for [`ColumnChunkMetaData`].
-pub type ColumnChunkMetaDataPtr = Rc<ColumnChunkMetaData>;
-
 /// Metadata for a column chunk.
 pub struct ColumnChunkMetaData {
     column_type: Type,
@@ -642,7 +633,7 @@
         let mut columns = vec![];
         for ptr in schema_descr.columns() {
             let column = ColumnChunkMetaData::builder(ptr.clone()).build().unwrap();
-            columns.push(Rc::new(column));
+            columns.push(column);
         }
         let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
             .set_num_rows(1000)
diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs
index 123fd84..f2881a3 100644
--- a/rust/parquet/src/file/reader.rs
+++ b/rust/parquet/src/file/reader.rs
@@ -55,13 +55,13 @@
 /// Parquet file, can get reader for each row group, and access record iterator.
 pub trait FileReader {
     /// Get metadata information about this file.
-    fn metadata(&self) -> ParquetMetaDataPtr;
+    fn metadata(&self) -> &ParquetMetaData;
 
     /// Get the total number of row groups for this file.
     fn num_row_groups(&self) -> usize;
 
     /// Get the `i`th row group reader. Note this doesn't do bound check.
-    fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>>;
+    fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>>;
 
     /// Get full iterator of `Row`s from a file (over all row groups).
     ///
@@ -76,7 +76,7 @@
 /// row group, as well as readers for each individual column chunk.
 pub trait RowGroupReader {
     /// Get metadata information about this row group.
-    fn metadata(&self) -> RowGroupMetaDataPtr;
+    fn metadata(&self) -> &RowGroupMetaData;
 
     /// Get the total number of column chunks in this row group.
     fn num_columns(&self) -> usize;
@@ -144,7 +144,7 @@
 /// A serialized implementation for Parquet [`FileReader`].
 pub struct SerializedFileReader<R: ParquetReader> {
     buf: BufReader<R>,
-    metadata: ParquetMetaDataPtr,
+    metadata: ParquetMetaData,
 }
 
 impl<R: ParquetReader> SerializedFileReader<R> {
@@ -153,10 +153,7 @@
     pub fn new(reader: R) -> Result<Self> {
         let mut buf = BufReader::new(reader);
         let metadata = Self::parse_metadata(&mut buf)?;
-        Ok(Self {
-            buf,
-            metadata: Rc::new(metadata),
-        })
+        Ok(Self { buf, metadata })
     }
 
     // Layout of Parquet file
@@ -205,10 +202,7 @@
         let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone()));
         let mut row_groups = Vec::new();
         for rg in t_file_metadata.row_groups {
-            row_groups.push(Rc::new(RowGroupMetaData::from_thrift(
-                schema_descr.clone(),
-                rg,
-            )?));
+            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
         }
         let column_orders =
             Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr);
@@ -258,15 +252,15 @@
 }
 
 impl<R: 'static + ParquetReader> FileReader for SerializedFileReader<R> {
-    fn metadata(&self) -> ParquetMetaDataPtr {
-        self.metadata.clone()
+    fn metadata(&self) -> &ParquetMetaData {
+        &self.metadata
     }
 
     fn num_row_groups(&self) -> usize {
         self.metadata.num_row_groups()
     }
 
-    fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader>> {
+    fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>> {
         let row_group_metadata = self.metadata.row_group(i);
         // Row groups should be processed sequentially.
         let f = self.buf.get_ref().try_clone()?;
@@ -326,22 +320,22 @@
 }
 
 /// A serialized implementation for Parquet [`RowGroupReader`].
-pub struct SerializedRowGroupReader<R: ParquetReader> {
+pub struct SerializedRowGroupReader<'a, R: ParquetReader> {
     buf: BufReader<R>,
-    metadata: RowGroupMetaDataPtr,
+    metadata: &'a RowGroupMetaData,
 }
 
-impl<R: 'static + ParquetReader> SerializedRowGroupReader<R> {
+impl<'a, R: 'static + ParquetReader> SerializedRowGroupReader<'a, R> {
     /// Creates new row group reader from a file and row group metadata.
-    fn new(file: R, metadata: RowGroupMetaDataPtr) -> Self {
+    fn new(file: R, metadata: &'a RowGroupMetaData) -> Self {
         let buf = BufReader::new(file);
         Self { buf, metadata }
     }
 }
 
-impl<R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<R> {
-    fn metadata(&self) -> RowGroupMetaDataPtr {
-        self.metadata.clone()
+impl<'a, R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<'a, R> {
+    fn metadata(&self) -> &RowGroupMetaData {
+        &self.metadata
     }
 
     fn num_columns(&self) -> usize {
@@ -602,7 +596,7 @@
         let num_columns = file_reader
             .metadata()
             .file_metadata()
-            .schema_descr_ptr()
+            .schema_descr()
             .num_columns();
 
         if column_index >= num_columns {
diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs
index 14a2703..f5105db 100644
--- a/rust/parquet/src/file/writer.rs
+++ b/rust/parquet/src/file/writer.rs
@@ -265,7 +265,7 @@
     column_index: usize,
     previous_writer_closed: bool,
     row_group_metadata: Option<RowGroupMetaDataPtr>,
-    column_chunks: Vec<ColumnChunkMetaDataPtr>,
+    column_chunks: Vec<ColumnChunkMetaData>,
 }
 
 impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
@@ -303,7 +303,7 @@
 
         // Update row group writer metrics
         self.total_bytes_written += bytes_written;
-        self.column_chunks.push(Rc::new(metadata));
+        self.column_chunks.push(metadata);
         if let Some(rows) = self.total_rows_written {
             if rows != rows_written {
                 return Err(general_err!(
@@ -372,8 +372,9 @@
         if self.row_group_metadata.is_none() {
             self.assert_previous_writer_closed()?;
 
+            let column_chunks = std::mem::replace(&mut self.column_chunks, vec![]);
             let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
-                .set_column_metadata(self.column_chunks.clone())
+                .set_column_metadata(column_chunks)
                 .set_total_byte_size(self.total_bytes_written as i64)
                 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
                 .build()?;
diff --git a/rust/parquet/src/record/triplet.rs b/rust/parquet/src/record/triplet.rs
index 9915b18..86c767f 100644
--- a/rust/parquet/src/record/triplet.rs
+++ b/rust/parquet/src/record/triplet.rs
@@ -518,8 +518,9 @@
     ) {
         let file = get_test_file(file_name);
         let file_reader = SerializedFileReader::new(file).unwrap();
+        let metadata = file_reader.metadata();
         // Get schema descriptor
-        let file_metadata = file_reader.metadata().file_metadata();
+        let file_metadata = metadata.file_metadata();
         let schema = file_metadata.schema_descr();
         // Get first row group
         let row_group_reader = file_reader.get_row_group(0).unwrap();