| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains column writer API. |
| |
| use bytes::Bytes; |
| use half::f16; |
| |
| use crate::bloom_filter::Sbbf; |
| use crate::file::page_index::column_index::ColumnIndexMetaData; |
| use crate::file::page_index::offset_index::OffsetIndexMetaData; |
| use std::collections::{BTreeSet, VecDeque}; |
| use std::str; |
| |
| use crate::basic::{ |
| BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type, |
| }; |
| use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; |
| use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues}; |
| use crate::compression::{Codec, CodecOptionsBuilder, create_codec}; |
| use crate::data_type::private::ParquetValueType; |
| use crate::data_type::*; |
| use crate::encodings::levels::LevelEncoder; |
| #[cfg(feature = "encryption")] |
| use crate::encryption::encrypt::get_column_crypto_metadata; |
| use crate::errors::{ParquetError, Result}; |
| use crate::file::metadata::{ |
| ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram, |
| OffsetIndexBuilder, PageEncodingStats, |
| }; |
| use crate::file::properties::{ |
| EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion, |
| }; |
| use crate::file::statistics::{Statistics, ValueStatistics}; |
| use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; |
| |
| pub(crate) mod encoder; |
| |
| macro_rules! downcast_writer { |
| ($e:expr, $i:ident, $b:expr) => { |
| match $e { |
| Self::BoolColumnWriter($i) => $b, |
| Self::Int32ColumnWriter($i) => $b, |
| Self::Int64ColumnWriter($i) => $b, |
| Self::Int96ColumnWriter($i) => $b, |
| Self::FloatColumnWriter($i) => $b, |
| Self::DoubleColumnWriter($i) => $b, |
| Self::ByteArrayColumnWriter($i) => $b, |
| Self::FixedLenByteArrayColumnWriter($i) => $b, |
| } |
| }; |
| } |
| |
| /// Column writer for a Parquet type. |
| /// |
| /// See [`get_column_writer`] to create instances of this type |
| pub enum ColumnWriter<'a> { |
| /// Column writer for boolean type |
| BoolColumnWriter(ColumnWriterImpl<'a, BoolType>), |
| /// Column writer for int32 type |
| Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>), |
| /// Column writer for int64 type |
| Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>), |
| /// Column writer for int96 (timestamp) type |
| Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>), |
| /// Column writer for float type |
| FloatColumnWriter(ColumnWriterImpl<'a, FloatType>), |
| /// Column writer for double type |
| DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>), |
| /// Column writer for byte array type |
| ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>), |
| /// Column writer for fixed length byte array type |
| FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>), |
| } |
| |
| impl ColumnWriter<'_> { |
| /// Returns the estimated total memory usage |
| #[cfg(feature = "arrow")] |
| pub(crate) fn memory_size(&self) -> usize { |
| downcast_writer!(self, typed, typed.memory_size()) |
| } |
| |
| /// Returns the estimated total encoded bytes for this column writer |
| #[cfg(feature = "arrow")] |
| pub(crate) fn get_estimated_total_bytes(&self) -> u64 { |
| downcast_writer!(self, typed, typed.get_estimated_total_bytes()) |
| } |
| |
| /// Close this [`ColumnWriter`], returning the metadata for the column chunk. |
| pub fn close(self) -> Result<ColumnCloseResult> { |
| downcast_writer!(self, typed, typed.close()) |
| } |
| } |
| |
| /// Create a specific column writer corresponding to column descriptor `descr`. |
| pub fn get_column_writer<'a>( |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| page_writer: Box<dyn PageWriter + 'a>, |
| ) -> ColumnWriter<'a> { |
| match descr.physical_type() { |
| Type::BOOLEAN => { |
| ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::INT32 => { |
| ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::INT64 => { |
| ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::INT96 => { |
| ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::FLOAT => { |
| ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::DOUBLE => { |
| ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::BYTE_ARRAY => { |
| ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer)) |
| } |
| Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter( |
| ColumnWriterImpl::new(descr, props, page_writer), |
| ), |
| } |
| } |
| |
| /// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of |
| /// non-generic type to a generic column writer type `ColumnWriterImpl`. |
| /// |
| /// Panics if actual enum value for `col_writer` does not match the type `T`. |
| pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> { |
| T::get_column_writer(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Similar to `get_typed_column_writer` but returns a reference. |
| pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>( |
| col_writer: &'b ColumnWriter<'a>, |
| ) -> &'b ColumnWriterImpl<'a, T> { |
| T::get_column_writer_ref(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Similar to `get_typed_column_writer` but returns a reference. |
| pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>( |
| col_writer: &'a mut ColumnWriter<'b>, |
| ) -> &'a mut ColumnWriterImpl<'b, T> { |
| T::get_column_writer_mut(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Metadata for a column chunk of a Parquet file. |
| /// |
| /// Note this structure is returned by [`ColumnWriter::close`]. |
| #[derive(Debug, Clone)] |
| pub struct ColumnCloseResult { |
| /// The total number of bytes written |
| pub bytes_written: u64, |
| /// The total number of rows written |
| pub rows_written: u64, |
| /// Metadata for this column chunk |
| pub metadata: ColumnChunkMetaData, |
| /// Optional bloom filter for this column |
| pub bloom_filter: Option<Sbbf>, |
| /// Optional column index, for filtering |
| pub column_index: Option<ColumnIndexMetaData>, |
| /// Optional offset index, identifying page locations |
| pub offset_index: Option<OffsetIndexMetaData>, |
| } |
| |
| // Metrics per page |
| #[derive(Default)] |
| struct PageMetrics { |
| num_buffered_values: u32, |
| num_buffered_rows: u32, |
| num_page_nulls: u64, |
| repetition_level_histogram: Option<LevelHistogram>, |
| definition_level_histogram: Option<LevelHistogram>, |
| } |
| |
| impl PageMetrics { |
| fn new() -> Self { |
| Default::default() |
| } |
| |
| /// Initialize the repetition level histogram |
| fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { |
| self.repetition_level_histogram = LevelHistogram::try_new(max_level); |
| self |
| } |
| |
| /// Initialize the definition level histogram |
| fn with_definition_level_histogram(mut self, max_level: i16) -> Self { |
| self.definition_level_histogram = LevelHistogram::try_new(max_level); |
| self |
| } |
| |
| /// Resets the state of this `PageMetrics` to the initial state. |
| /// If histograms have been initialized their contents will be reset to zero. |
| fn new_page(&mut self) { |
| self.num_buffered_values = 0; |
| self.num_buffered_rows = 0; |
| self.num_page_nulls = 0; |
| self.repetition_level_histogram |
| .as_mut() |
| .map(LevelHistogram::reset); |
| self.definition_level_histogram |
| .as_mut() |
| .map(LevelHistogram::reset); |
| } |
| |
| /// Updates histogram values using provided repetition levels |
| fn update_repetition_level_histogram(&mut self, levels: &[i16]) { |
| if let Some(ref mut rep_hist) = self.repetition_level_histogram { |
| rep_hist.update_from_levels(levels); |
| } |
| } |
| |
| /// Updates histogram values using provided definition levels |
| fn update_definition_level_histogram(&mut self, levels: &[i16]) { |
| if let Some(ref mut def_hist) = self.definition_level_histogram { |
| def_hist.update_from_levels(levels); |
| } |
| } |
| } |
| |
| // Metrics per column writer |
| #[derive(Default)] |
| struct ColumnMetrics<T: Default> { |
| total_bytes_written: u64, |
| total_rows_written: u64, |
| total_uncompressed_size: u64, |
| total_compressed_size: u64, |
| total_num_values: u64, |
| dictionary_page_offset: Option<u64>, |
| data_page_offset: Option<u64>, |
| min_column_value: Option<T>, |
| max_column_value: Option<T>, |
| num_column_nulls: u64, |
| column_distinct_count: Option<u64>, |
| variable_length_bytes: Option<i64>, |
| repetition_level_histogram: Option<LevelHistogram>, |
| definition_level_histogram: Option<LevelHistogram>, |
| } |
| |
| impl<T: Default> ColumnMetrics<T> { |
| fn new() -> Self { |
| Default::default() |
| } |
| |
| /// Initialize the repetition level histogram |
| fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { |
| self.repetition_level_histogram = LevelHistogram::try_new(max_level); |
| self |
| } |
| |
| /// Initialize the definition level histogram |
| fn with_definition_level_histogram(mut self, max_level: i16) -> Self { |
| self.definition_level_histogram = LevelHistogram::try_new(max_level); |
| self |
| } |
| |
| /// Sum `page_histogram` into `chunk_histogram` |
| fn update_histogram( |
| chunk_histogram: &mut Option<LevelHistogram>, |
| page_histogram: &Option<LevelHistogram>, |
| ) { |
| if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) { |
| chunk_hist.add(page_hist); |
| } |
| } |
| |
| /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if |
| /// page histograms are not initialized. |
| fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) { |
| ColumnMetrics::<T>::update_histogram( |
| &mut self.definition_level_histogram, |
| &page_metrics.definition_level_histogram, |
| ); |
| ColumnMetrics::<T>::update_histogram( |
| &mut self.repetition_level_histogram, |
| &page_metrics.repetition_level_histogram, |
| ); |
| } |
| |
| /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes |
| fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) { |
| if let Some(var_bytes) = variable_length_bytes { |
| *self.variable_length_bytes.get_or_insert(0) += var_bytes; |
| } |
| } |
| } |
| |
| /// Typed column writer for a primitive column. |
| pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>; |
| |
| /// Generic column writer for a primitive Parquet column |
| pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { |
| // Column writer properties |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| statistics_enabled: EnabledStatistics, |
| |
| page_writer: Box<dyn PageWriter + 'a>, |
| codec: Compression, |
| compressor: Option<Box<dyn Codec>>, |
| encoder: E, |
| |
| page_metrics: PageMetrics, |
| // Metrics per column writer |
| column_metrics: ColumnMetrics<E::T>, |
| |
| /// The order of encodings within the generated metadata does not impact its meaning, |
| /// but we use a BTreeSet so that the output is deterministic |
| encodings: BTreeSet<Encoding>, |
| encoding_stats: Vec<PageEncodingStats>, |
| // Reused buffers |
| def_levels_sink: Vec<i16>, |
| rep_levels_sink: Vec<i16>, |
| data_pages: VecDeque<CompressedPage>, |
| // column index and offset index |
| column_index_builder: ColumnIndexBuilder, |
| offset_index_builder: Option<OffsetIndexBuilder>, |
| |
| // Below fields used to incrementally check boundary order across data pages. |
| // We assume they are ascending/descending until proven wrong. |
| data_page_boundary_ascending: bool, |
| data_page_boundary_descending: bool, |
| /// (min, max) |
| last_non_null_data_page_min_max: Option<(E::T, E::T)>, |
| } |
| |
| impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { |
| /// Returns a new instance of [`GenericColumnWriter`]. |
| pub fn new( |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| page_writer: Box<dyn PageWriter + 'a>, |
| ) -> Self { |
| let codec = props.compression(descr.path()); |
| let codec_options = CodecOptionsBuilder::default().build(); |
| let compressor = create_codec(codec, &codec_options).unwrap(); |
| let encoder = E::try_new(&descr, props.as_ref()).unwrap(); |
| |
| let statistics_enabled = props.statistics_enabled(descr.path()); |
| |
| let mut encodings = BTreeSet::new(); |
| // Used for level information |
| encodings.insert(Encoding::RLE); |
| |
| let mut page_metrics = PageMetrics::new(); |
| let mut column_metrics = ColumnMetrics::<E::T>::new(); |
| |
| // Initialize level histograms if collecting page or chunk statistics |
| if statistics_enabled != EnabledStatistics::None { |
| page_metrics = page_metrics |
| .with_repetition_level_histogram(descr.max_rep_level()) |
| .with_definition_level_histogram(descr.max_def_level()); |
| column_metrics = column_metrics |
| .with_repetition_level_histogram(descr.max_rep_level()) |
| .with_definition_level_histogram(descr.max_def_level()) |
| } |
| |
| // Disable column_index_builder if not collecting page statistics. |
| let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type()); |
| if statistics_enabled != EnabledStatistics::Page { |
| column_index_builder.to_invalid() |
| } |
| |
| // Disable offset_index_builder if requested by user. |
| let offset_index_builder = match props.offset_index_disabled() { |
| false => Some(OffsetIndexBuilder::new()), |
| _ => None, |
| }; |
| |
| Self { |
| descr, |
| props, |
| statistics_enabled, |
| page_writer, |
| codec, |
| compressor, |
| encoder, |
| def_levels_sink: vec![], |
| rep_levels_sink: vec![], |
| data_pages: VecDeque::new(), |
| page_metrics, |
| column_metrics, |
| column_index_builder, |
| offset_index_builder, |
| encodings, |
| encoding_stats: vec![], |
| data_page_boundary_ascending: true, |
| data_page_boundary_descending: true, |
| last_non_null_data_page_min_max: None, |
| } |
| } |
| |
| #[allow(clippy::too_many_arguments)] |
| pub(crate) fn write_batch_internal( |
| &mut self, |
| values: &E::Values, |
| value_indices: Option<&[usize]>, |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| min: Option<&E::T>, |
| max: Option<&E::T>, |
| distinct_count: Option<u64>, |
| ) -> Result<usize> { |
| // Check if number of definition levels is the same as number of repetition levels. |
| if let (Some(def), Some(rep)) = (def_levels, rep_levels) { |
| if def.len() != rep.len() { |
| return Err(general_err!( |
| "Inconsistent length of definition and repetition levels: {} != {}", |
| def.len(), |
| rep.len() |
| )); |
| } |
| } |
| |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be well above the limit. |
| // |
| // The purpose of this chunking is to bound this. Even if a user writes large |
| // number of values, the chunking will ensure that we add data page at a |
| // reasonable pagesize limit. |
| |
| // TODO: find out why we don't account for size of levels when we estimate page |
| // size. |
| |
| let num_levels = match def_levels { |
| Some(def_levels) => def_levels.len(), |
| None => values.len(), |
| }; |
| |
| if let Some(min) = min { |
| update_min(&self.descr, min, &mut self.column_metrics.min_column_value); |
| } |
| if let Some(max) = max { |
| update_max(&self.descr, max, &mut self.column_metrics.max_column_value); |
| } |
| |
| // We can only set the distinct count if there are no other writes |
| if self.encoder.num_values() == 0 { |
| self.column_metrics.column_distinct_count = distinct_count; |
| } else { |
| self.column_metrics.column_distinct_count = None; |
| } |
| |
| let mut values_offset = 0; |
| let mut levels_offset = 0; |
| let base_batch_size = self.props.write_batch_size(); |
| while levels_offset < num_levels { |
| let mut end_offset = num_levels.min(levels_offset + base_batch_size); |
| |
| // Split at record boundary |
| if let Some(r) = rep_levels { |
| while end_offset < r.len() && r[end_offset] != 0 { |
| end_offset += 1; |
| } |
| } |
| |
| values_offset += self.write_mini_batch( |
| values, |
| values_offset, |
| value_indices, |
| end_offset - levels_offset, |
| def_levels.map(|lv| &lv[levels_offset..end_offset]), |
| rep_levels.map(|lv| &lv[levels_offset..end_offset]), |
| )?; |
| levels_offset = end_offset; |
| } |
| |
| // Return total number of values processed. |
| Ok(values_offset) |
| } |
| |
| /// Writes batch of values, definition levels and repetition levels. |
| /// Returns number of values processed (written). |
| /// |
| /// If definition and repetition levels are provided, we write fully those levels and |
| /// select how many values to write (this number will be returned), since number of |
| /// actual written values may be smaller than provided values. |
| /// |
| /// If only values are provided, then all values are written and the length of |
| /// of the values buffer is returned. |
| /// |
| /// Definition and/or repetition levels can be omitted, if values are |
| /// non-nullable and/or non-repeated. |
| pub fn write_batch( |
| &mut self, |
| values: &E::Values, |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| ) -> Result<usize> { |
| self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None) |
| } |
| |
| /// Writer may optionally provide pre-calculated statistics for use when computing |
| /// chunk-level statistics |
| /// |
| /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`] |
| /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored, |
| /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the |
| /// computed page statistics |
| pub fn write_batch_with_statistics( |
| &mut self, |
| values: &E::Values, |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| min: Option<&E::T>, |
| max: Option<&E::T>, |
| distinct_count: Option<u64>, |
| ) -> Result<usize> { |
| self.write_batch_internal( |
| values, |
| None, |
| def_levels, |
| rep_levels, |
| min, |
| max, |
| distinct_count, |
| ) |
| } |
| |
| /// Returns the estimated total memory usage. |
| /// |
| /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate |
| /// of the current memory usage and not the final anticipated encoded size. |
| #[cfg(feature = "arrow")] |
| pub(crate) fn memory_size(&self) -> usize { |
| self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() |
| } |
| |
| /// Returns total number of bytes written by this column writer so far. |
| /// This value is also returned when column writer is closed. |
| /// |
| /// Note: this value does not include any buffered data that has not |
| /// yet been flushed to a page. |
| pub fn get_total_bytes_written(&self) -> u64 { |
| self.column_metrics.total_bytes_written |
| } |
| |
| /// Returns the estimated total encoded bytes for this column writer. |
| /// |
| /// Unlike [`Self::get_total_bytes_written`] this includes an estimate |
| /// of any data that has not yet been flushed to a page, based on it's |
| /// anticipated encoded size. |
| #[cfg(feature = "arrow")] |
| pub(crate) fn get_estimated_total_bytes(&self) -> u64 { |
| self.data_pages |
| .iter() |
| .map(|page| page.data().len() as u64) |
| .sum::<u64>() |
| + self.column_metrics.total_bytes_written |
| + self.encoder.estimated_data_page_size() as u64 |
| + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64 |
| } |
| |
| /// Returns total number of rows written by this column writer so far. |
| /// This value is also returned when column writer is closed. |
| pub fn get_total_rows_written(&self) -> u64 { |
| self.column_metrics.total_rows_written |
| } |
| |
| /// Returns a reference to a [`ColumnDescPtr`] |
| pub fn get_descriptor(&self) -> &ColumnDescPtr { |
| &self.descr |
| } |
| |
| /// Finalizes writes and closes the column writer. |
| /// Returns total bytes written, total rows written and column chunk metadata. |
| pub fn close(mut self) -> Result<ColumnCloseResult> { |
| if self.page_metrics.num_buffered_values > 0 { |
| self.add_data_page()?; |
| } |
| if self.encoder.has_dictionary() { |
| self.write_dictionary_page()?; |
| } |
| self.flush_data_pages()?; |
| let metadata = self.build_column_metadata()?; |
| self.page_writer.close()?; |
| |
| let boundary_order = match ( |
| self.data_page_boundary_ascending, |
| self.data_page_boundary_descending, |
| ) { |
| // If the lists are composed of equal elements then will be marked as ascending |
| // (Also the case if all pages are null pages) |
| (true, _) => BoundaryOrder::ASCENDING, |
| (false, true) => BoundaryOrder::DESCENDING, |
| (false, false) => BoundaryOrder::UNORDERED, |
| }; |
| self.column_index_builder.set_boundary_order(boundary_order); |
| |
| let column_index = match self.column_index_builder.valid() { |
| true => Some(self.column_index_builder.build()?), |
| false => None, |
| }; |
| |
| let offset_index = self.offset_index_builder.map(|b| b.build()); |
| |
| Ok(ColumnCloseResult { |
| bytes_written: self.column_metrics.total_bytes_written, |
| rows_written: self.column_metrics.total_rows_written, |
| bloom_filter: self.encoder.flush_bloom_filter(), |
| metadata, |
| column_index, |
| offset_index, |
| }) |
| } |
| |
| /// Writes mini batch of values, definition and repetition levels. |
| /// This allows fine-grained processing of values and maintaining a reasonable |
| /// page size. |
| fn write_mini_batch( |
| &mut self, |
| values: &E::Values, |
| values_offset: usize, |
| value_indices: Option<&[usize]>, |
| num_levels: usize, |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| ) -> Result<usize> { |
| // Process definition levels and determine how many values to write. |
| let values_to_write = if self.descr.max_def_level() > 0 { |
| let levels = def_levels.ok_or_else(|| { |
| general_err!( |
| "Definition levels are required, because max definition level = {}", |
| self.descr.max_def_level() |
| ) |
| })?; |
| |
| let values_to_write = levels |
| .iter() |
| .map(|level| (*level == self.descr.max_def_level()) as usize) |
| .sum(); |
| self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64; |
| |
| // Update histogram |
| self.page_metrics.update_definition_level_histogram(levels); |
| |
| self.def_levels_sink.extend_from_slice(levels); |
| values_to_write |
| } else { |
| num_levels |
| }; |
| |
| // Process repetition levels and determine how many rows we are about to process. |
| if self.descr.max_rep_level() > 0 { |
| // A row could contain more than one value. |
| let levels = rep_levels.ok_or_else(|| { |
| general_err!( |
| "Repetition levels are required, because max repetition level = {}", |
| self.descr.max_rep_level() |
| ) |
| })?; |
| |
| if !levels.is_empty() && levels[0] != 0 { |
| return Err(general_err!( |
| "Write must start at a record boundary, got non-zero repetition level of {}", |
| levels[0] |
| )); |
| } |
| |
| // Count the occasions where we start a new row |
| for &level in levels { |
| self.page_metrics.num_buffered_rows += (level == 0) as u32 |
| } |
| |
| // Update histogram |
| self.page_metrics.update_repetition_level_histogram(levels); |
| |
| self.rep_levels_sink.extend_from_slice(levels); |
| } else { |
| // Each value is exactly one row. |
| // Equals to the number of values, we count nulls as well. |
| self.page_metrics.num_buffered_rows += num_levels as u32; |
| } |
| |
| match value_indices { |
| Some(indices) => { |
| let indices = &indices[values_offset..values_offset + values_to_write]; |
| self.encoder.write_gather(values, indices)?; |
| } |
| None => self.encoder.write(values, values_offset, values_to_write)?, |
| } |
| |
| self.page_metrics.num_buffered_values += num_levels as u32; |
| |
| if self.should_add_data_page() { |
| self.add_data_page()?; |
| } |
| |
| if self.should_dict_fallback() { |
| self.dict_fallback()?; |
| } |
| |
| Ok(values_to_write) |
| } |
| |
| /// Returns true if we need to fall back to non-dictionary encoding. |
| /// |
| /// We can only fall back if dictionary encoder is set and we have exceeded dictionary |
| /// size. |
| #[inline] |
| fn should_dict_fallback(&self) -> bool { |
| match self.encoder.estimated_dict_page_size() { |
| Some(size) => { |
| size >= self |
| .props |
| .column_dictionary_page_size_limit(self.descr.path()) |
| } |
| None => false, |
| } |
| } |
| |
| /// Returns true if there is enough data for a data page, false otherwise. |
| #[inline] |
| fn should_add_data_page(&self) -> bool { |
| // This is necessary in the event of a much larger dictionary size than page size |
| // |
| // In such a scenario the dictionary decoder may return an estimated encoded |
| // size in excess of the page size limit, even when there are no buffered values |
| if self.page_metrics.num_buffered_values == 0 { |
| return false; |
| } |
| |
| self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit() |
| || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit() |
| } |
| |
| /// Performs dictionary fallback. |
| /// Prepares and writes dictionary and all data pages into page writer. |
| fn dict_fallback(&mut self) -> Result<()> { |
| // At this point we know that we need to fall back. |
| if self.page_metrics.num_buffered_values > 0 { |
| self.add_data_page()?; |
| } |
| self.write_dictionary_page()?; |
| self.flush_data_pages()?; |
| Ok(()) |
| } |
| |
| /// Update the column index and offset index when adding the data page |
| fn update_column_offset_index( |
| &mut self, |
| page_statistics: Option<&ValueStatistics<E::T>>, |
| page_variable_length_bytes: Option<i64>, |
| ) { |
| // update the column index |
| let null_page = |
| (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; |
| // a page contains only null values, |
| // and writers have to set the corresponding entries in min_values and max_values to byte[0] |
| if null_page && self.column_index_builder.valid() { |
| self.column_index_builder.append( |
| null_page, |
| vec![], |
| vec![], |
| self.page_metrics.num_page_nulls as i64, |
| ); |
| } else if self.column_index_builder.valid() { |
| // from page statistics |
| // If can't get the page statistics, ignore this column/offset index for this column chunk |
| match &page_statistics { |
| None => { |
| self.column_index_builder.to_invalid(); |
| } |
| Some(stat) => { |
| // Check if min/max are still ascending/descending across pages |
| let new_min = stat.min_opt().unwrap(); |
| let new_max = stat.max_opt().unwrap(); |
| if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max { |
| if self.data_page_boundary_ascending { |
| // If last min/max are greater than new min/max then not ascending anymore |
| let not_ascending = compare_greater(&self.descr, last_min, new_min) |
| || compare_greater(&self.descr, last_max, new_max); |
| if not_ascending { |
| self.data_page_boundary_ascending = false; |
| } |
| } |
| |
| if self.data_page_boundary_descending { |
| // If new min/max are greater than last min/max then not descending anymore |
| let not_descending = compare_greater(&self.descr, new_min, last_min) |
| || compare_greater(&self.descr, new_max, last_max); |
| if not_descending { |
| self.data_page_boundary_descending = false; |
| } |
| } |
| } |
| self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone())); |
| |
| if self.can_truncate_value() { |
| self.column_index_builder.append( |
| null_page, |
| self.truncate_min_value( |
| self.props.column_index_truncate_length(), |
| stat.min_bytes_opt().unwrap(), |
| ) |
| .0, |
| self.truncate_max_value( |
| self.props.column_index_truncate_length(), |
| stat.max_bytes_opt().unwrap(), |
| ) |
| .0, |
| self.page_metrics.num_page_nulls as i64, |
| ); |
| } else { |
| self.column_index_builder.append( |
| null_page, |
| stat.min_bytes_opt().unwrap().to_vec(), |
| stat.max_bytes_opt().unwrap().to_vec(), |
| self.page_metrics.num_page_nulls as i64, |
| ); |
| } |
| } |
| } |
| } |
| |
| // Append page histograms to the `ColumnIndex` histograms |
| self.column_index_builder.append_histograms( |
| &self.page_metrics.repetition_level_histogram, |
| &self.page_metrics.definition_level_histogram, |
| ); |
| |
| // Update the offset index |
| if let Some(builder) = self.offset_index_builder.as_mut() { |
| builder.append_row_count(self.page_metrics.num_buffered_rows as i64); |
| builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes); |
| } |
| } |
| |
| /// Determine if we should allow truncating min/max values for this column's statistics |
| fn can_truncate_value(&self) -> bool { |
| match self.descr.physical_type() { |
| // Don't truncate for Float16 and Decimal because their sort order is different |
| // from that of FIXED_LEN_BYTE_ARRAY sort order. |
| // So truncation of those types could lead to inaccurate min/max statistics |
| Type::FIXED_LEN_BYTE_ARRAY |
| if !matches!( |
| self.descr.logical_type(), |
| Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16) |
| ) => |
| { |
| true |
| } |
| Type::BYTE_ARRAY => true, |
| // Truncation only applies for fba/binary physical types |
| _ => false, |
| } |
| } |
| |
| /// Returns `true` if this column's logical type is a UTF-8 string. |
| fn is_utf8(&self) -> bool { |
| self.get_descriptor().logical_type() == Some(LogicalType::String) |
| || self.get_descriptor().converted_type() == ConvertedType::UTF8 |
| } |
| |
| /// Truncates a binary statistic to at most `truncation_length` bytes. |
| /// |
| /// If truncation is not possible, returns `data`. |
| /// |
| /// The `bool` in the returned tuple indicates whether truncation occurred or not. |
| /// |
| /// UTF-8 Note: |
| /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will |
| /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting |
| /// on non-character boundaries. |
| fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) { |
| truncation_length |
| .filter(|l| data.len() > *l) |
| .and_then(|l| |
| // don't do extra work if this column isn't UTF-8 |
| if self.is_utf8() { |
| match str::from_utf8(data) { |
| Ok(str_data) => truncate_utf8(str_data, l), |
| Err(_) => Some(data[..l].to_vec()), |
| } |
| } else { |
| Some(data[..l].to_vec()) |
| } |
| ) |
| .map(|truncated| (truncated, true)) |
| .unwrap_or_else(|| (data.to_vec(), false)) |
| } |
| |
| /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the |
| /// final byte(s) to yield a valid upper bound. This may result in a result of less than |
| /// `truncation_length` bytes if the last byte(s) overflows. |
| /// |
| /// If truncation is not possible, returns `data`. |
| /// |
| /// The `bool` in the returned tuple indicates whether truncation occurred or not. |
| /// |
| /// UTF-8 Note: |
| /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will |
| /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data` |
| /// does not contain valid UTF-8, then truncation will occur as if the column is non-string |
| /// binary. |
| fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) { |
| truncation_length |
| .filter(|l| data.len() > *l) |
| .and_then(|l| |
| // don't do extra work if this column isn't UTF-8 |
| if self.is_utf8() { |
| match str::from_utf8(data) { |
| Ok(str_data) => truncate_and_increment_utf8(str_data, l), |
| Err(_) => increment(data[..l].to_vec()), |
| } |
| } else { |
| increment(data[..l].to_vec()) |
| } |
| ) |
| .map(|truncated| (truncated, true)) |
| .unwrap_or_else(|| (data.to_vec(), false)) |
| } |
| |
| /// Truncate the min and max values that will be written to a data page |
| /// header or column chunk Statistics |
| fn truncate_statistics(&self, statistics: Statistics) -> Statistics { |
| let backwards_compatible_min_max = self.descr.sort_order().is_signed(); |
| match statistics { |
| Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => { |
| let (min, did_truncate_min) = self.truncate_min_value( |
| self.props.statistics_truncate_length(), |
| stats.min_bytes_opt().unwrap(), |
| ); |
| let (max, did_truncate_max) = self.truncate_max_value( |
| self.props.statistics_truncate_length(), |
| stats.max_bytes_opt().unwrap(), |
| ); |
| Statistics::ByteArray( |
| ValueStatistics::new( |
| Some(min.into()), |
| Some(max.into()), |
| stats.distinct_count(), |
| stats.null_count_opt(), |
| backwards_compatible_min_max, |
| ) |
| .with_max_is_exact(!did_truncate_max) |
| .with_min_is_exact(!did_truncate_min), |
| ) |
| } |
| Statistics::FixedLenByteArray(stats) |
| if (stats._internal_has_min_max_set() && self.can_truncate_value()) => |
| { |
| let (min, did_truncate_min) = self.truncate_min_value( |
| self.props.statistics_truncate_length(), |
| stats.min_bytes_opt().unwrap(), |
| ); |
| let (max, did_truncate_max) = self.truncate_max_value( |
| self.props.statistics_truncate_length(), |
| stats.max_bytes_opt().unwrap(), |
| ); |
| Statistics::FixedLenByteArray( |
| ValueStatistics::new( |
| Some(min.into()), |
| Some(max.into()), |
| stats.distinct_count(), |
| stats.null_count_opt(), |
| backwards_compatible_min_max, |
| ) |
| .with_max_is_exact(!did_truncate_max) |
| .with_min_is_exact(!did_truncate_min), |
| ) |
| } |
| stats => stats, |
| } |
| } |
| |
| /// Adds data page. |
| /// Data page is either buffered in case of dictionary encoding or written directly. |
| fn add_data_page(&mut self) -> Result<()> { |
| // Extract encoded values |
| let values_data = self.encoder.flush_data_page()?; |
| |
| let max_def_level = self.descr.max_def_level(); |
| let max_rep_level = self.descr.max_rep_level(); |
| |
| self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls; |
| |
| let page_statistics = match (values_data.min_value, values_data.max_value) { |
| (Some(min), Some(max)) => { |
| // Update chunk level statistics |
| update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); |
| update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); |
| |
| (self.statistics_enabled == EnabledStatistics::Page).then_some( |
| ValueStatistics::new( |
| Some(min), |
| Some(max), |
| None, |
| Some(self.page_metrics.num_page_nulls), |
| false, |
| ), |
| ) |
| } |
| _ => None, |
| }; |
| |
| // update column and offset index |
| self.update_column_offset_index( |
| page_statistics.as_ref(), |
| values_data.variable_length_bytes, |
| ); |
| |
| // Update histograms and variable_length_bytes in column_metrics |
| self.column_metrics |
| .update_from_page_metrics(&self.page_metrics); |
| self.column_metrics |
| .update_variable_length_bytes(values_data.variable_length_bytes); |
| |
| // From here on, we only need page statistics if they will be written to the page header. |
| let page_statistics = page_statistics |
| .filter(|_| self.props.write_page_header_statistics(self.descr.path())) |
| .map(|stats| self.truncate_statistics(Statistics::from(stats))); |
| |
| let compressed_page = match self.props.writer_version() { |
| WriterVersion::PARQUET_1_0 => { |
| let mut buffer = vec![]; |
| |
| if max_rep_level > 0 { |
| buffer.extend_from_slice( |
| &self.encode_levels_v1( |
| Encoding::RLE, |
| &self.rep_levels_sink[..], |
| max_rep_level, |
| )[..], |
| ); |
| } |
| |
| if max_def_level > 0 { |
| buffer.extend_from_slice( |
| &self.encode_levels_v1( |
| Encoding::RLE, |
| &self.def_levels_sink[..], |
| max_def_level, |
| )[..], |
| ); |
| } |
| |
| buffer.extend_from_slice(&values_data.buf); |
| let uncompressed_size = buffer.len(); |
| |
| if let Some(ref mut cmpr) = self.compressor { |
| let mut compressed_buf = Vec::with_capacity(uncompressed_size); |
| cmpr.compress(&buffer[..], &mut compressed_buf)?; |
| compressed_buf.shrink_to_fit(); |
| buffer = compressed_buf; |
| } |
| |
| let data_page = Page::DataPage { |
| buf: buffer.into(), |
| num_values: self.page_metrics.num_buffered_values, |
| encoding: values_data.encoding, |
| def_level_encoding: Encoding::RLE, |
| rep_level_encoding: Encoding::RLE, |
| statistics: page_statistics, |
| }; |
| |
| CompressedPage::new(data_page, uncompressed_size) |
| } |
| WriterVersion::PARQUET_2_0 => { |
| let mut rep_levels_byte_len = 0; |
| let mut def_levels_byte_len = 0; |
| let mut buffer = vec![]; |
| |
| if max_rep_level > 0 { |
| let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level); |
| rep_levels_byte_len = levels.len(); |
| buffer.extend_from_slice(&levels[..]); |
| } |
| |
| if max_def_level > 0 { |
| let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level); |
| def_levels_byte_len = levels.len(); |
| buffer.extend_from_slice(&levels[..]); |
| } |
| |
| let uncompressed_size = |
| rep_levels_byte_len + def_levels_byte_len + values_data.buf.len(); |
| |
| // Data Page v2 compresses values only. |
| let is_compressed = match self.compressor { |
| Some(ref mut cmpr) => { |
| let buffer_len = buffer.len(); |
| cmpr.compress(&values_data.buf, &mut buffer)?; |
| if uncompressed_size <= buffer.len() - buffer_len { |
| buffer.truncate(buffer_len); |
| buffer.extend_from_slice(&values_data.buf); |
| false |
| } else { |
| true |
| } |
| } |
| None => { |
| buffer.extend_from_slice(&values_data.buf); |
| false |
| } |
| }; |
| |
| let data_page = Page::DataPageV2 { |
| buf: buffer.into(), |
| num_values: self.page_metrics.num_buffered_values, |
| encoding: values_data.encoding, |
| num_nulls: self.page_metrics.num_page_nulls as u32, |
| num_rows: self.page_metrics.num_buffered_rows, |
| def_levels_byte_len: def_levels_byte_len as u32, |
| rep_levels_byte_len: rep_levels_byte_len as u32, |
| is_compressed, |
| statistics: page_statistics, |
| }; |
| |
| CompressedPage::new(data_page, uncompressed_size) |
| } |
| }; |
| |
| // Check if we need to buffer data page or flush it to the sink directly. |
| if self.encoder.has_dictionary() { |
| self.data_pages.push_back(compressed_page); |
| } else { |
| self.write_data_page(compressed_page)?; |
| } |
| |
| // Update total number of rows. |
| self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64; |
| |
| // Reset state. |
| self.rep_levels_sink.clear(); |
| self.def_levels_sink.clear(); |
| self.page_metrics.new_page(); |
| |
| Ok(()) |
| } |
| |
| /// Finalises any outstanding data pages and flushes buffered data pages from |
| /// dictionary encoding into underlying sink. |
| #[inline] |
| fn flush_data_pages(&mut self) -> Result<()> { |
| // Write all outstanding data to a new page. |
| if self.page_metrics.num_buffered_values > 0 { |
| self.add_data_page()?; |
| } |
| |
| while let Some(page) = self.data_pages.pop_front() { |
| self.write_data_page(page)?; |
| } |
| |
| Ok(()) |
| } |
| |
| /// Assembles column chunk metadata. |
| fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> { |
| let total_compressed_size = self.column_metrics.total_compressed_size as i64; |
| let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64; |
| let num_values = self.column_metrics.total_num_values as i64; |
| let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64); |
| // If data page offset is not set, then no pages have been written |
| let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64; |
| |
| let mut builder = ColumnChunkMetaData::builder(self.descr.clone()) |
| .set_compression(self.codec) |
| .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter())) |
| .set_page_encoding_stats(self.encoding_stats.clone()) |
| .set_total_compressed_size(total_compressed_size) |
| .set_total_uncompressed_size(total_uncompressed_size) |
| .set_num_values(num_values) |
| .set_data_page_offset(data_page_offset) |
| .set_dictionary_page_offset(dict_page_offset); |
| |
| if self.statistics_enabled != EnabledStatistics::None { |
| let backwards_compatible_min_max = self.descr.sort_order().is_signed(); |
| |
| let statistics = ValueStatistics::<E::T>::new( |
| self.column_metrics.min_column_value.clone(), |
| self.column_metrics.max_column_value.clone(), |
| self.column_metrics.column_distinct_count, |
| Some(self.column_metrics.num_column_nulls), |
| false, |
| ) |
| .with_backwards_compatible_min_max(backwards_compatible_min_max) |
| .into(); |
| |
| let statistics = self.truncate_statistics(statistics); |
| |
| builder = builder |
| .set_statistics(statistics) |
| .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes) |
| .set_repetition_level_histogram( |
| self.column_metrics.repetition_level_histogram.take(), |
| ) |
| .set_definition_level_histogram( |
| self.column_metrics.definition_level_histogram.take(), |
| ); |
| |
| if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() { |
| builder = builder.set_geo_statistics(geo_stats); |
| } |
| } |
| |
| builder = self.set_column_chunk_encryption_properties(builder); |
| |
| let metadata = builder.build()?; |
| Ok(metadata) |
| } |
| |
| /// Encodes definition or repetition levels for Data Page v1. |
| #[inline] |
| fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> { |
| let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len()); |
| encoder.put(levels); |
| encoder.consume() |
| } |
| |
| /// Encodes definition or repetition levels for Data Page v2. |
| /// Encoding is always RLE. |
| #[inline] |
| fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> { |
| let mut encoder = LevelEncoder::v2(max_level, levels.len()); |
| encoder.put(levels); |
| encoder.consume() |
| } |
| |
| /// Writes compressed data page into underlying sink and updates global metrics. |
| #[inline] |
| fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { |
| self.encodings.insert(page.encoding()); |
| match self.encoding_stats.last_mut() { |
| Some(encoding_stats) |
| if encoding_stats.page_type == page.page_type() |
| && encoding_stats.encoding == page.encoding() => |
| { |
| encoding_stats.count += 1; |
| } |
| _ => { |
| // data page type does not change inside a file |
| // encoding can currently only change from dictionary to non-dictionary once |
| self.encoding_stats.push(PageEncodingStats { |
| page_type: page.page_type(), |
| encoding: page.encoding(), |
| count: 1, |
| }); |
| } |
| } |
| let page_spec = self.page_writer.write_page(page)?; |
| // update offset index |
| // compressed_size = header_size + compressed_data_size |
| if let Some(builder) = self.offset_index_builder.as_mut() { |
| builder |
| .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32) |
| } |
| self.update_metrics_for_page(page_spec); |
| Ok(()) |
| } |
| |
| /// Writes dictionary page into underlying sink. |
| #[inline] |
| fn write_dictionary_page(&mut self) -> Result<()> { |
| let compressed_page = { |
| let mut page = self |
| .encoder |
| .flush_dict_page()? |
| .ok_or_else(|| general_err!("Dictionary encoder is not set"))?; |
| |
| let uncompressed_size = page.buf.len(); |
| |
| if let Some(ref mut cmpr) = self.compressor { |
| let mut output_buf = Vec::with_capacity(uncompressed_size); |
| cmpr.compress(&page.buf, &mut output_buf)?; |
| page.buf = Bytes::from(output_buf); |
| } |
| |
| let dict_page = Page::DictionaryPage { |
| buf: page.buf, |
| num_values: page.num_values as u32, |
| encoding: self.props.dictionary_page_encoding(), |
| is_sorted: page.is_sorted, |
| }; |
| CompressedPage::new(dict_page, uncompressed_size) |
| }; |
| |
| self.encodings.insert(compressed_page.encoding()); |
| self.encoding_stats.push(PageEncodingStats { |
| page_type: PageType::DICTIONARY_PAGE, |
| encoding: compressed_page.encoding(), |
| count: 1, |
| }); |
| let page_spec = self.page_writer.write_page(compressed_page)?; |
| self.update_metrics_for_page(page_spec); |
| // For the directory page, don't need to update column/offset index. |
| Ok(()) |
| } |
| |
| /// Updates column writer metrics with each page metadata. |
| #[inline] |
| fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) { |
| self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64; |
| self.column_metrics.total_compressed_size += page_spec.compressed_size as u64; |
| self.column_metrics.total_bytes_written += page_spec.bytes_written; |
| |
| match page_spec.page_type { |
| PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => { |
| self.column_metrics.total_num_values += page_spec.num_values as u64; |
| if self.column_metrics.data_page_offset.is_none() { |
| self.column_metrics.data_page_offset = Some(page_spec.offset); |
| } |
| } |
| PageType::DICTIONARY_PAGE => { |
| assert!( |
| self.column_metrics.dictionary_page_offset.is_none(), |
| "Dictionary offset is already set" |
| ); |
| self.column_metrics.dictionary_page_offset = Some(page_spec.offset); |
| } |
| _ => {} |
| } |
| } |
| |
| #[inline] |
| #[cfg(feature = "encryption")] |
| fn set_column_chunk_encryption_properties( |
| &self, |
| builder: ColumnChunkMetaDataBuilder, |
| ) -> ColumnChunkMetaDataBuilder { |
| if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() { |
| builder.set_column_crypto_metadata(get_column_crypto_metadata( |
| encryption_properties, |
| &self.descr, |
| )) |
| } else { |
| builder |
| } |
| } |
| |
| #[inline] |
| #[cfg(not(feature = "encryption"))] |
| fn set_column_chunk_encryption_properties( |
| &self, |
| builder: ColumnChunkMetaDataBuilder, |
| ) -> ColumnChunkMetaDataBuilder { |
| builder |
| } |
| } |
| |
| fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) { |
| update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val)) |
| } |
| |
| fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) { |
| update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur)) |
| } |
| |
| #[inline] |
| #[allow(clippy::eq_op)] |
| fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool { |
| match T::PHYSICAL_TYPE { |
| Type::FLOAT | Type::DOUBLE => val != val, |
| Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => { |
| let val = val.as_bytes(); |
| let val = f16::from_le_bytes([val[0], val[1]]); |
| val.is_nan() |
| } |
| _ => false, |
| } |
| } |
| |
| /// Perform a conditional update of `cur`, skipping any NaN values |
| /// |
| /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with |
| /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` |
| fn update_stat<T: ParquetValueType, F>( |
| descr: &ColumnDescriptor, |
| val: &T, |
| cur: &mut Option<T>, |
| should_update: F, |
| ) where |
| F: Fn(&T) -> bool, |
| { |
| if is_nan(descr, val) { |
| return; |
| } |
| |
| if cur.as_ref().is_none_or(should_update) { |
| *cur = Some(val.clone()); |
| } |
| } |
| |
| /// Evaluate `a > b` according to underlying logical type. |
| fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool { |
| match T::PHYSICAL_TYPE { |
| Type::INT32 | Type::INT64 => { |
| if let Some(LogicalType::Integer { |
| is_signed: false, .. |
| }) = descr.logical_type() |
| { |
| // need to compare unsigned |
| return compare_greater_unsigned_int(a, b); |
| } |
| |
| match descr.converted_type() { |
| ConvertedType::UINT_8 |
| | ConvertedType::UINT_16 |
| | ConvertedType::UINT_32 |
| | ConvertedType::UINT_64 => { |
| return compare_greater_unsigned_int(a, b); |
| } |
| _ => {} |
| }; |
| } |
| Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { |
| if let Some(LogicalType::Decimal { .. }) = descr.logical_type() { |
| return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes()); |
| } |
| if let ConvertedType::DECIMAL = descr.converted_type() { |
| return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes()); |
| } |
| if let Some(LogicalType::Float16) = descr.logical_type() { |
| return compare_greater_f16(a.as_bytes(), b.as_bytes()); |
| } |
| } |
| |
| _ => {} |
| } |
| |
| // compare independent of logical / converted type |
| a > b |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Encoding support for column writer. |
| // This mirrors parquet-mr default encodings for writes. See: |
| // https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java |
| // https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java |
| |
| /// Returns encoding for a column when no other encoding is provided in writer properties. |
| fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding { |
| match (kind, props.writer_version()) { |
| (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE, |
| (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, |
| (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, |
| (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, |
| _ => Encoding::PLAIN, |
| } |
| } |
| |
| /// Returns true if dictionary is supported for column writer, false otherwise. |
| fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool { |
| match (kind, props.writer_version()) { |
| // Booleans do not support dict encoding and should use a fallback encoding. |
| (Type::BOOLEAN, _) => false, |
| // Dictionary encoding was not enabled in PARQUET 1.0 |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false, |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true, |
| _ => true, |
| } |
| } |
| |
| #[inline] |
| fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool { |
| a.as_u64().unwrap() > b.as_u64().unwrap() |
| } |
| |
| #[inline] |
| fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool { |
| let a = f16::from_le_bytes(a.try_into().unwrap()); |
| let b = f16::from_le_bytes(b.try_into().unwrap()); |
| a > b |
| } |
| |
| /// Signed comparison of bytes arrays |
| fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool { |
| let a_length = a.len(); |
| let b_length = b.len(); |
| |
| if a_length == 0 || b_length == 0 { |
| return a_length > 0; |
| } |
| |
| let first_a: u8 = a[0]; |
| let first_b: u8 = b[0]; |
| |
| // We can short circuit for different signed numbers or |
| // for equal length bytes arrays that have different first bytes. |
| // The equality requirement is necessary for sign extension cases. |
| // 0xFF10 should be equal to 0x10 (due to big endian sign extension). |
| if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) { |
| return (first_a as i8) > (first_b as i8); |
| } |
| |
| // When the lengths are unequal and the numbers are of the same |
| // sign we need to do comparison by sign extending the shorter |
| // value first, and once we get to equal sized arrays, lexicographical |
| // unsigned comparison of everything but the first byte is sufficient. |
| |
| let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 }; |
| |
| if a_length != b_length { |
| let not_equal = if a_length > b_length { |
| let lead_length = a_length - b_length; |
| a[0..lead_length].iter().any(|&x| x != extension) |
| } else { |
| let lead_length = b_length - a_length; |
| b[0..lead_length].iter().any(|&x| x != extension) |
| }; |
| |
| if not_equal { |
| let negative_values: bool = (first_a as i8) < 0; |
| let a_longer: bool = a_length > b_length; |
| return if negative_values { !a_longer } else { a_longer }; |
| } |
| } |
| |
| (a[1..]) > (b[1..]) |
| } |
| |
| /// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string, |
| /// while being less than `length` bytes and non-empty. Returns `None` if truncation |
| /// is not possible within those constraints. |
| /// |
| /// The caller guarantees that data.len() > length. |
| fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> { |
| let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?; |
| Some(data.as_bytes()[..split].to_vec()) |
| } |
| |
| /// Truncate a UTF-8 slice and increment it's final character. The returned value is the |
| /// longest such slice that is still a valid UTF-8 string while being less than `length` |
| /// bytes and non-empty. Returns `None` if no such transformation is possible. |
| /// |
| /// The caller guarantees that data.len() > length. |
| fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> { |
| // UTF-8 is max 4 bytes, so start search 3 back from desired length |
| let lower_bound = length.saturating_sub(3); |
| let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?; |
| increment_utf8(data.get(..split)?) |
| } |
| |
| /// Increment the final character in a UTF-8 string in such a way that the returned result |
| /// is still a valid UTF-8 string. The returned string may be shorter than the input if the |
| /// last character(s) cannot be incremented (due to overflow or producing invalid code points). |
| /// Returns `None` if the string cannot be incremented. |
| /// |
| /// Note that this implementation will not promote an N-byte code point to (N+1) bytes. |
| fn increment_utf8(data: &str) -> Option<Vec<u8>> { |
| for (idx, original_char) in data.char_indices().rev() { |
| let original_len = original_char.len_utf8(); |
| if let Some(next_char) = char::from_u32(original_char as u32 + 1) { |
| // do not allow increasing byte width of incremented char |
| if next_char.len_utf8() == original_len { |
| let mut result = data.as_bytes()[..idx + original_len].to_vec(); |
| next_char.encode_utf8(&mut result[idx..]); |
| return Some(result); |
| } |
| } |
| } |
| |
| None |
| } |
| |
| /// Try and increment the bytes from right to left. |
| /// |
| /// Returns `None` if all bytes are set to `u8::MAX`. |
| fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> { |
| for byte in data.iter_mut().rev() { |
| let (incremented, overflow) = byte.overflowing_add(1); |
| *byte = incremented; |
| |
| if !overflow { |
| return Some(data); |
| } |
| } |
| |
| None |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::{ |
| file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter}, |
| schema::parser::parse_message_type, |
| }; |
| use core::str; |
| use rand::distr::uniform::SampleUniform; |
| use std::{fs::File, sync::Arc}; |
| |
| use crate::column::{ |
| page::PageReader, |
| reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader}, |
| }; |
| use crate::file::writer::TrackedWrite; |
| use crate::file::{ |
| properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter, |
| }; |
| use crate::schema::types::{ColumnPath, Type as SchemaType}; |
| use crate::util::test_common::rand_gen::random_numbers_range; |
| |
| use super::*; |
| |
| #[test] |
| fn test_column_writer_inconsistent_def_rep_length() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0])); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{err}"), |
| "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_invalid_def_levels() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], None, None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{err}"), |
| "Parquet error: Definition levels are required, because max definition level = 1" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_invalid_rep_levels() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], None, None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{err}"), |
| "Parquet error: Repetition levels are required, because max repetition level = 1" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_not_enough_values_to_write() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{err}"), |
| "Parquet error: Expected to write 4 values, but have only 2" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_write_only_one_dictionary_page() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| // First page should be correctly written. |
| writer.add_data_page().unwrap(); |
| writer.write_dictionary_page().unwrap(); |
| let err = writer.write_dictionary_page().unwrap_err().to_string(); |
| assert_eq!(err, "Parquet error: Dictionary encoder is not set"); |
| } |
| |
| #[test] |
| fn test_column_writer_error_when_writing_disabled_dictionary() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_dictionary_enabled(false) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| let err = writer.write_dictionary_page().unwrap_err().to_string(); |
| assert_eq!(err, "Parquet error: Dictionary encoder is not set"); |
| } |
| |
| #[test] |
| fn test_column_writer_boolean_type_does_not_support_dictionary() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_dictionary_enabled(true) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props); |
| writer |
| .write_batch(&[true, false, true, false], None, None) |
| .unwrap(); |
| |
| let r = writer.close().unwrap(); |
| // PlainEncoder uses bit writer to write boolean values, which all fit into 1 |
| // byte. |
| assert_eq!(r.bytes_written, 1); |
| assert_eq!(r.rows_written, 4); |
| |
| let metadata = r.metadata; |
| assert_eq!( |
| metadata.encodings().collect::<Vec<_>>(), |
| vec![Encoding::PLAIN, Encoding::RLE] |
| ); |
| assert_eq!(metadata.num_values(), 4); // just values |
| assert_eq!(metadata.dictionary_page_offset(), None); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_bool() { |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[true, false], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[true, false], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[true, false], |
| None, |
| &[Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[true, false], |
| None, |
| &[Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int32() { |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED], |
| &[encoding_stats( |
| PageType::DATA_PAGE_V2, |
| Encoding::DELTA_BINARY_PACKED, |
| 1, |
| )], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int64() { |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED], |
| &[encoding_stats( |
| PageType::DATA_PAGE_V2, |
| Encoding::DELTA_BINARY_PACKED, |
| 1, |
| )], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int96() { |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[Int96::from(vec![1, 2, 3])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[Int96::from(vec![1, 2, 3])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[Int96::from(vec![1, 2, 3])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[Int96::from(vec![1, 2, 3])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_float() { |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_double() { |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_byte_array() { |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[ByteArray::from(vec![1u8])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[ByteArray::from(vec![1u8])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[ByteArray::from(vec![1u8])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[ByteArray::from(vec![1u8])], |
| None, |
| &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY], |
| &[encoding_stats( |
| PageType::DATA_PAGE_V2, |
| Encoding::DELTA_BYTE_ARRAY, |
| 1, |
| )], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_fixed_len_byte_array() { |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[ByteArray::from(vec![1u8]).into()], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], |
| &[ |
| encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1), |
| encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1), |
| ], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY], |
| &[encoding_stats( |
| PageType::DATA_PAGE_V2, |
| Encoding::DELTA_BYTE_ARRAY, |
| 1, |
| )], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_check_metadata() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| |
| let r = writer.close().unwrap(); |
| assert_eq!(r.bytes_written, 20); |
| assert_eq!(r.rows_written, 4); |
| |
| let metadata = r.metadata; |
| assert_eq!( |
| metadata.encodings().collect::<Vec<_>>(), |
| vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] |
| ); |
| assert_eq!(metadata.num_values(), 4); |
| assert_eq!(metadata.compressed_size(), 20); |
| assert_eq!(metadata.uncompressed_size(), 20); |
| assert_eq!(metadata.data_page_offset(), 0); |
| assert_eq!(metadata.dictionary_page_offset(), Some(0)); |
| if let Some(stats) = metadata.statistics() { |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &1); |
| assert_eq!(stats.max_opt().unwrap(), &4); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_check_byte_array_min_max() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props); |
| writer |
| .write_batch( |
| &[ |
| ByteArray::from(vec![ |
| 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, |
| 35u8, 231u8, 90u8, 0u8, 0u8, |
| ]), |
| ByteArray::from(vec![ |
| 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8, |
| 152u8, 177u8, 56u8, 0u8, 0u8, |
| ]), |
| ByteArray::from(vec![ |
| 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, |
| 0u8, |
| ]), |
| ByteArray::from(vec![ |
| 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8, |
| 44u8, 0u8, 0u8, |
| ]), |
| ], |
| None, |
| None, |
| ) |
| .unwrap(); |
| let metadata = writer.close().unwrap().metadata; |
| if let Some(stats) = metadata.statistics() { |
| if let Statistics::ByteArray(stats) = stats { |
| assert_eq!( |
| stats.min_opt().unwrap(), |
| &ByteArray::from(vec![ |
| 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, |
| 35u8, 231u8, 90u8, 0u8, 0u8, |
| ]) |
| ); |
| assert_eq!( |
| stats.max_opt().unwrap(), |
| &ByteArray::from(vec![ |
| 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8, |
| 44u8, 0u8, 0u8, |
| ]) |
| ); |
| } else { |
| panic!("expecting Statistics::ByteArray"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_uint32_converted_type_min_max() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>( |
| page_writer, |
| 0, |
| 0, |
| props, |
| ); |
| writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap(); |
| let metadata = writer.close().unwrap().metadata; |
| if let Some(stats) = metadata.statistics() { |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &0,); |
| assert_eq!(stats.max_opt().unwrap(), &5,); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_precalculated_statistics() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::Chunk) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer |
| .write_batch_with_statistics( |
| &[1, 2, 3, 4], |
| None, |
| None, |
| Some(&-17), |
| Some(&9000), |
| Some(55), |
| ) |
| .unwrap(); |
| |
| let r = writer.close().unwrap(); |
| assert_eq!(r.bytes_written, 20); |
| assert_eq!(r.rows_written, 4); |
| |
| let metadata = r.metadata; |
| assert_eq!( |
| metadata.encodings().collect::<Vec<_>>(), |
| vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY] |
| ); |
| assert_eq!(metadata.num_values(), 4); |
| assert_eq!(metadata.compressed_size(), 20); |
| assert_eq!(metadata.uncompressed_size(), 20); |
| assert_eq!(metadata.data_page_offset(), 0); |
| assert_eq!(metadata.dictionary_page_offset(), Some(0)); |
| if let Some(stats) = metadata.statistics() { |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55); |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-17); |
| assert_eq!(stats.max_opt().unwrap(), &9000); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_mixed_precomputed_statistics() { |
| let mut buf = Vec::with_capacity(100); |
| let mut write = TrackedWrite::new(&mut buf); |
| let page_writer = Box::new(SerializedPageWriter::new(&mut write)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_write_page_header_statistics(true) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| writer |
| .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3)) |
| .unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| let stats = r.metadata.statistics().unwrap(); |
| assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes()); |
| assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes()); |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert!(stats.distinct_count_opt().is_none()); |
| |
| drop(write); |
| |
| let props = ReaderProperties::builder() |
| .set_backward_compatible_lz4(false) |
| .set_read_page_statistics(true) |
| .build(); |
| let reader = SerializedPageReader::new_with_properties( |
| Arc::new(Bytes::from(buf)), |
| &r.metadata, |
| r.rows_written as usize, |
| None, |
| Arc::new(props), |
| ) |
| .unwrap(); |
| |
| let pages = reader.collect::<Result<Vec<_>>>().unwrap(); |
| assert_eq!(pages.len(), 2); |
| |
| assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); |
| assert_eq!(pages[1].page_type(), PageType::DATA_PAGE); |
| |
| let page_statistics = pages[1].statistics().unwrap(); |
| assert_eq!( |
| page_statistics.min_bytes_opt().unwrap(), |
| 1_i32.to_le_bytes() |
| ); |
| assert_eq!( |
| page_statistics.max_bytes_opt().unwrap(), |
| 7_i32.to_le_bytes() |
| ); |
| assert_eq!(page_statistics.null_count_opt(), Some(0)); |
| assert!(page_statistics.distinct_count_opt().is_none()); |
| } |
| |
| #[test] |
| fn test_disabled_statistics() { |
| let mut buf = Vec::with_capacity(100); |
| let mut write = TrackedWrite::new(&mut buf); |
| let page_writer = Box::new(SerializedPageWriter::new(&mut write)); |
| let props = WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::None) |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| .build(); |
| let props = Arc::new(props); |
| |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| writer |
| .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None) |
| .unwrap(); |
| |
| let r = writer.close().unwrap(); |
| assert!(r.metadata.statistics().is_none()); |
| |
| drop(write); |
| |
| let props = ReaderProperties::builder() |
| .set_backward_compatible_lz4(false) |
| .build(); |
| let reader = SerializedPageReader::new_with_properties( |
| Arc::new(Bytes::from(buf)), |
| &r.metadata, |
| r.rows_written as usize, |
| None, |
| Arc::new(props), |
| ) |
| .unwrap(); |
| |
| let pages = reader.collect::<Result<Vec<_>>>().unwrap(); |
| assert_eq!(pages.len(), 2); |
| |
| assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); |
| assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2); |
| |
| match &pages[1] { |
| Page::DataPageV2 { |
| num_values, |
| num_nulls, |
| num_rows, |
| statistics, |
| .. |
| } => { |
| assert_eq!(*num_values, 6); |
| assert_eq!(*num_nulls, 2); |
| assert_eq!(*num_rows, 6); |
| assert!(statistics.is_none()); |
| } |
| _ => unreachable!(), |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_empty_column_roundtrip() { |
| let props = Default::default(); |
| column_roundtrip::<Int32Type>(props, &[], None, None); |
| } |
| |
| #[test] |
| fn test_column_writer_non_nullable_values_roundtrip() { |
| let props = Default::default(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0); |
| } |
| |
| #[test] |
| fn test_column_writer_nullable_non_repeated_values_roundtrip() { |
| let props = Default::default(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0); |
| } |
| |
| #[test] |
| fn test_column_writer_nullable_repeated_values_roundtrip() { |
| let props = Default::default(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_fallback_small_data_page() { |
| let props = WriterProperties::builder() |
| .set_dictionary_page_size_limit(32) |
| .set_data_page_size_limit(32) |
| .build(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_small_write_batch_size() { |
| for i in &[1usize, 2, 5, 10, 11, 1023] { |
| let props = WriterProperties::builder().set_write_batch_size(*i).build(); |
| |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_disabled_v1() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_1_0) |
| .set_dictionary_enabled(false) |
| .build(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_disabled_v2() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| .set_dictionary_enabled(false) |
| .build(); |
| column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_compression_v1() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_1_0) |
| .set_compression(Compression::SNAPPY) |
| .build(); |
| column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_compression_v2() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| .set_compression(Compression::SNAPPY) |
| .build(); |
| column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10); |
| } |
| |
| #[test] |
| fn test_column_writer_add_data_pages_with_dict() { |
| // ARROW-5129: Test verifies that we add data page in case of dictionary encoding |
| // and no fallback occurred so far. |
| let mut file = tempfile::tempfile().unwrap(); |
| let mut write = TrackedWrite::new(&mut file); |
| let page_writer = Box::new(SerializedPageWriter::new(&mut write)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_data_page_size_limit(10) |
| .set_write_batch_size(3) // write 3 values at a time |
| .build(), |
| ); |
| let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(data, None, None).unwrap(); |
| let r = writer.close().unwrap(); |
| |
| drop(write); |
| |
| // Read pages and check the sequence |
| let props = ReaderProperties::builder() |
| .set_backward_compatible_lz4(false) |
| .build(); |
| let mut page_reader = Box::new( |
| SerializedPageReader::new_with_properties( |
| Arc::new(file), |
| &r.metadata, |
| r.rows_written as usize, |
| None, |
| Arc::new(props), |
| ) |
| .unwrap(), |
| ); |
| let mut res = Vec::new(); |
| while let Some(page) = page_reader.get_next_page().unwrap() { |
| res.push((page.page_type(), page.num_values(), page.buffer().len())); |
| } |
| assert_eq!( |
| res, |
| vec![ |
| (PageType::DICTIONARY_PAGE, 10, 40), |
| (PageType::DATA_PAGE, 9, 10), |
| (PageType::DATA_PAGE, 1, 3), |
| ] |
| ); |
| assert_eq!( |
| r.metadata.page_encoding_stats(), |
| Some(&vec![ |
| PageEncodingStats { |
| page_type: PageType::DICTIONARY_PAGE, |
| encoding: Encoding::PLAIN, |
| count: 1 |
| }, |
| PageEncodingStats { |
| page_type: PageType::DATA_PAGE, |
| encoding: Encoding::RLE_DICTIONARY, |
| count: 2, |
| } |
| ]) |
| ); |
| } |
| |
| #[test] |
| fn test_bool_statistics() { |
| let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]); |
| // Booleans have an unsigned sort order and so are not compatible |
| // with the deprecated `min` and `max` statistics |
| assert!(!stats.is_min_max_backwards_compatible()); |
| if let Statistics::Boolean(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &false); |
| assert_eq!(stats.max_opt().unwrap(), &true); |
| } else { |
| panic!("expecting Statistics::Boolean, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_int32_statistics() { |
| let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2); |
| assert_eq!(stats.max_opt().unwrap(), &3); |
| } else { |
| panic!("expecting Statistics::Int32, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_int64_statistics() { |
| let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Int64(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2); |
| assert_eq!(stats.max_opt().unwrap(), &3); |
| } else { |
| panic!("expecting Statistics::Int64, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_int96_statistics() { |
| let input = vec![ |
| Int96::from(vec![1, 20, 30]), |
| Int96::from(vec![3, 20, 10]), |
| Int96::from(vec![0, 20, 30]), |
| Int96::from(vec![2, 20, 30]), |
| ] |
| .into_iter() |
| .collect::<Vec<Int96>>(); |
| |
| let stats = statistics_roundtrip::<Int96Type>(&input); |
| assert!(!stats.is_min_max_backwards_compatible()); |
| if let Statistics::Int96(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10])); |
| assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30])); |
| } else { |
| panic!("expecting Statistics::Int96, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics() { |
| let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2.0); |
| assert_eq!(stats.max_opt().unwrap(), &3.0); |
| } else { |
| panic!("expecting Statistics::Float, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics() { |
| let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2.0); |
| assert_eq!(stats.max_opt().unwrap(), &3.0); |
| } else { |
| panic!("expecting Statistics::Double, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_byte_array_statistics() { |
| let input = ["aawaa", "zz", "aaw", "m", "qrs"] |
| .iter() |
| .map(|&s| s.into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = statistics_roundtrip::<ByteArrayType>(&input); |
| assert!(!stats.is_min_max_backwards_compatible()); |
| if let Statistics::ByteArray(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw")); |
| assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz")); |
| } else { |
| panic!("expecting Statistics::ByteArray, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_fixed_len_byte_array_statistics() { |
| let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "] |
| .iter() |
| .map(|&s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input); |
| assert!(!stats.is_min_max_backwards_compatible()); |
| if let Statistics::FixedLenByteArray(stats) = stats { |
| let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into(); |
| assert_eq!(stats.min_opt().unwrap(), &expected_min); |
| let expected_max: FixedLenByteArray = ByteArray::from("zz ").into(); |
| assert_eq!(stats.max_opt().unwrap(), &expected_max); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray, got {stats:?}"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_check_float16_min_max() { |
| let input = [ |
| -f16::ONE, |
| f16::from_f32(3.0), |
| -f16::from_f32(2.0), |
| f16::from_f32(2.0), |
| ] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!( |
| stats.min_opt().unwrap(), |
| &ByteArray::from(-f16::from_f32(2.0)) |
| ); |
| assert_eq!( |
| stats.max_opt().unwrap(), |
| &ByteArray::from(f16::from_f32(3.0)) |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_check_float16_nan_middle() { |
| let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); |
| assert_eq!( |
| stats.max_opt().unwrap(), |
| &ByteArray::from(f16::ONE + f16::ONE) |
| ); |
| } |
| |
| #[test] |
| fn test_float16_statistics_nan_middle() { |
| let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); |
| assert_eq!( |
| stats.max_opt().unwrap(), |
| &ByteArray::from(f16::ONE + f16::ONE) |
| ); |
| } |
| |
| #[test] |
| fn test_float16_statistics_nan_start() { |
| let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); |
| assert_eq!( |
| stats.max_opt().unwrap(), |
| &ByteArray::from(f16::ONE + f16::ONE) |
| ); |
| } |
| |
| #[test] |
| fn test_float16_statistics_nan_only() { |
| let input = [f16::NAN, f16::NAN] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.min_bytes_opt().is_none()); |
| assert!(stats.max_bytes_opt().is_none()); |
| assert!(stats.is_min_max_backwards_compatible()); |
| } |
| |
| #[test] |
| fn test_float16_statistics_zero_only() { |
| let input = [f16::ZERO] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); |
| assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); |
| } |
| |
| #[test] |
| fn test_float16_statistics_neg_zero_only() { |
| let input = [f16::NEG_ZERO] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); |
| assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); |
| } |
| |
| #[test] |
| fn test_float16_statistics_zero_min() { |
| let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); |
| assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI)); |
| } |
| |
| #[test] |
| fn test_float16_statistics_neg_zero_max() { |
| let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let stats = float16_statistics_roundtrip(&input); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI)); |
| assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_middle() { |
| let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &1.0); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_start() { |
| let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &1.0); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_only() { |
| let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]); |
| assert!(stats.min_bytes_opt().is_none()); |
| assert!(stats.max_bytes_opt().is_none()); |
| assert!(stats.is_min_max_backwards_compatible()); |
| assert!(matches!(stats, Statistics::Float(_))); |
| } |
| |
| #[test] |
| fn test_float_statistics_zero_only() { |
| let stats = statistics_roundtrip::<FloatType>(&[0.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_neg_zero_only() { |
| let stats = statistics_roundtrip::<FloatType>(&[-0.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_zero_min() { |
| let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_neg_zero_max() { |
| let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2.0); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_middle() { |
| let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &1.0); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_start() { |
| let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &1.0); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_only() { |
| let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]); |
| assert!(stats.min_bytes_opt().is_none()); |
| assert!(stats.max_bytes_opt().is_none()); |
| assert!(matches!(stats, Statistics::Double(_))); |
| assert!(stats.is_min_max_backwards_compatible()); |
| } |
| |
| #[test] |
| fn test_double_statistics_zero_only() { |
| let stats = statistics_roundtrip::<DoubleType>(&[0.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_neg_zero_only() { |
| let stats = statistics_roundtrip::<DoubleType>(&[-0.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_zero_min() { |
| let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-0.0); |
| assert!(stats.min_opt().unwrap().is_sign_negative()); |
| assert_eq!(stats.max_opt().unwrap(), &2.0); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_neg_zero_max() { |
| let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]); |
| assert!(stats.is_min_max_backwards_compatible()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min_opt().unwrap(), &-2.0); |
| assert_eq!(stats.max_opt().unwrap(), &0.0); |
| assert!(stats.max_opt().unwrap().is_sign_positive()); |
| } else { |
| panic!("expecting Statistics::Double"); |
| } |
| } |
| |
| #[test] |
| fn test_compare_greater_byte_array_decimals() { |
| assert!(!compare_greater_byte_array_decimals(&[], &[],),); |
| assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),); |
| assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),); |
| assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),); |
| assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),); |
| assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),); |
| assert!(!compare_greater_byte_array_decimals( |
| &[0u8, 1u8,], |
| &[1u8, 0u8,], |
| ),); |
| assert!(!compare_greater_byte_array_decimals( |
| &[255u8, 35u8, 0u8, 0u8,], |
| &[0u8,], |
| ),); |
| assert!(compare_greater_byte_array_decimals( |
| &[0u8,], |
| &[255u8, 35u8, 0u8, 0u8,], |
| ),); |
| } |
| |
| #[test] |
| fn test_column_index_with_null_pages() { |
| // write a single page of all nulls |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap(); |
| |
| let r = writer.close().unwrap(); |
| assert!(r.column_index.is_some()); |
| let col_idx = r.column_index.unwrap(); |
| let col_idx = match col_idx { |
| ColumnIndexMetaData::INT32(col_idx) => col_idx, |
| _ => panic!("wrong stats type"), |
| }; |
| // null_pages should be true for page 0 |
| assert!(col_idx.is_null_page(0)); |
| // min and max should be empty byte arrays |
| assert!(col_idx.min_value(0).is_none()); |
| assert!(col_idx.max_value(0).is_none()); |
| // null_counts should be defined and be 4 for page 0 |
| assert!(col_idx.null_count(0).is_some()); |
| assert_eq!(col_idx.null_count(0), Some(4)); |
| // there is no repetition so rep histogram should be absent |
| assert!(col_idx.repetition_level_histogram(0).is_none()); |
| // definition_level_histogram should be present and should be 0:4, 1:0 |
| assert!(col_idx.definition_level_histogram(0).is_some()); |
| assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]); |
| } |
| |
| #[test] |
| fn test_column_offset_index_metadata() { |
| // write data |
| // and check the offset index and column index |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| // first page |
| writer.flush_data_pages().unwrap(); |
| // second page |
| writer.write_batch(&[4, 8, 2, -5], None, None).unwrap(); |
| |
| let r = writer.close().unwrap(); |
| let column_index = r.column_index.unwrap(); |
| let offset_index = r.offset_index.unwrap(); |
| |
| assert_eq!(8, r.rows_written); |
| |
| // column index |
| let column_index = match column_index { |
| ColumnIndexMetaData::INT32(column_index) => column_index, |
| _ => panic!("wrong stats type"), |
| }; |
| assert_eq!(2, column_index.num_pages()); |
| assert_eq!(2, offset_index.page_locations.len()); |
| assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); |
| for idx in 0..2 { |
| assert!(!column_index.is_null_page(idx)); |
| assert_eq!(0, column_index.null_count(0).unwrap()); |
| } |
| |
| if let Some(stats) = r.metadata.statistics() { |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::Int32(stats) = stats { |
| // first page is [1,2,3,4] |
| // second page is [-5,2,4,8] |
| // note that we don't increment here, as this is a non BinaryArray type. |
| assert_eq!(stats.min_opt(), column_index.min_value(1)); |
| assert_eq!(stats.max_opt(), column_index.max_value(1)); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| |
| // page location |
| assert_eq!(0, offset_index.page_locations[0].first_row_index); |
| assert_eq!(4, offset_index.page_locations[1].first_row_index); |
| } |
| |
| /// Verify min/max value truncation in the column index works as expected |
| #[test] |
| fn test_column_offset_index_metadata_truncating() { |
| // write data |
| // and check the offset index and column index |
| let page_writer = get_test_page_writer(); |
| let props = WriterProperties::builder() |
| .set_statistics_truncate_length(None) // disable column index truncation |
| .build() |
| .into(); |
| let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); |
| |
| let mut data = vec![FixedLenByteArray::default(); 3]; |
| // This is the expected min value - "aaa..." |
| data[0].set_data(Bytes::from(vec![97_u8; 200])); |
| // This is the expected max value - "ZZZ..." |
| data[1].set_data(Bytes::from(vec![112_u8; 200])); |
| data[2].set_data(Bytes::from(vec![98_u8; 200])); |
| |
| writer.write_batch(&data, None, None).unwrap(); |
| |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| let column_index = r.column_index.unwrap(); |
| let offset_index = r.offset_index.unwrap(); |
| |
| let column_index = match column_index { |
| ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, |
| _ => panic!("wrong stats type"), |
| }; |
| |
| assert_eq!(3, r.rows_written); |
| |
| // column index |
| assert_eq!(1, column_index.num_pages()); |
| assert_eq!(1, offset_index.page_locations.len()); |
| assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); |
| assert!(!column_index.is_null_page(0)); |
| assert_eq!(Some(0), column_index.null_count(0)); |
| |
| if let Some(stats) = r.metadata.statistics() { |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::FixedLenByteArray(stats) = stats { |
| let column_index_min_value = column_index.min_value(0).unwrap(); |
| let column_index_max_value = column_index.max_value(0).unwrap(); |
| |
| // Column index stats are truncated, while the column chunk's aren't. |
| assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value); |
| assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value); |
| |
| assert_eq!( |
| column_index_min_value.len(), |
| DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap() |
| ); |
| assert_eq!(column_index_min_value, &[97_u8; 64]); |
| assert_eq!( |
| column_index_max_value.len(), |
| DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap() |
| ); |
| |
| // We expect the last byte to be incremented |
| assert_eq!( |
| *column_index_max_value.last().unwrap(), |
| *column_index_max_value.first().unwrap() + 1 |
| ); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_offset_index_truncating_spec_example() { |
| // write data |
| // and check the offset index and column index |
| let page_writer = get_test_page_writer(); |
| |
| // Truncate values at 1 byte |
| let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); |
| let props = Arc::new(builder.build()); |
| let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); |
| |
| let mut data = vec![FixedLenByteArray::default(); 1]; |
| // This is the expected min value |
| data[0].set_data(Bytes::from(String::from("Blart Versenwald III"))); |
| |
| writer.write_batch(&data, None, None).unwrap(); |
| |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| let column_index = r.column_index.unwrap(); |
| let offset_index = r.offset_index.unwrap(); |
| |
| let column_index = match column_index { |
| ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, |
| _ => panic!("wrong stats type"), |
| }; |
| |
| assert_eq!(1, r.rows_written); |
| |
| // column index |
| assert_eq!(1, column_index.num_pages()); |
| assert_eq!(1, offset_index.page_locations.len()); |
| assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); |
| assert!(!column_index.is_null_page(0)); |
| assert_eq!(Some(0), column_index.null_count(0)); |
| |
| if let Some(stats) = r.metadata.statistics() { |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::FixedLenByteArray(_stats) = stats { |
| let column_index_min_value = column_index.min_value(0).unwrap(); |
| let column_index_max_value = column_index.max_value(0).unwrap(); |
| |
| assert_eq!(column_index_min_value.len(), 1); |
| assert_eq!(column_index_max_value.len(), 1); |
| |
| assert_eq!("B".as_bytes(), column_index_min_value); |
| assert_eq!("C".as_bytes(), column_index_max_value); |
| |
| assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap()); |
| assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap()); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_float16_min_max_no_truncation() { |
| // Even if we set truncation to occur at 1 byte, we should not truncate for Float16 |
| let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); |
| let props = Arc::new(builder.build()); |
| let page_writer = get_test_page_writer(); |
| let mut writer = get_test_float16_column_writer(page_writer, props); |
| |
| let expected_value = f16::PI.to_le_bytes().to_vec(); |
| let data = vec![ByteArray::from(expected_value.clone()).into()]; |
| writer.write_batch(&data, None, None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| // stats should still be written |
| // ensure bytes weren't truncated for column index |
| let column_index = r.column_index.unwrap(); |
| let column_index = match column_index { |
| ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, |
| _ => panic!("wrong stats type"), |
| }; |
| let column_index_min_bytes = column_index.min_value(0).unwrap(); |
| let column_index_max_bytes = column_index.max_value(0).unwrap(); |
| assert_eq!(expected_value, column_index_min_bytes); |
| assert_eq!(expected_value, column_index_max_bytes); |
| |
| // ensure bytes weren't truncated for statistics |
| let stats = r.metadata.statistics().unwrap(); |
| if let Statistics::FixedLenByteArray(stats) = stats { |
| let stats_min_bytes = stats.min_bytes_opt().unwrap(); |
| let stats_max_bytes = stats.max_bytes_opt().unwrap(); |
| assert_eq!(expected_value, stats_min_bytes); |
| assert_eq!(expected_value, stats_max_bytes); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray"); |
| } |
| } |
| |
| #[test] |
| fn test_decimal_min_max_no_truncation() { |
| // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal |
| let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1)); |
| let props = Arc::new(builder.build()); |
| let page_writer = get_test_page_writer(); |
| let mut writer = |
| get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); |
| |
| let expected_value = vec![ |
| 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8, |
| 231u8, 90u8, 0u8, 0u8, |
| ]; |
| let data = vec![ByteArray::from(expected_value.clone()).into()]; |
| writer.write_batch(&data, None, None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| // stats should still be written |
| // ensure bytes weren't truncated for column index |
| let column_index = r.column_index.unwrap(); |
| let column_index = match column_index { |
| ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index, |
| _ => panic!("wrong stats type"), |
| }; |
| let column_index_min_bytes = column_index.min_value(0).unwrap(); |
| let column_index_max_bytes = column_index.max_value(0).unwrap(); |
| assert_eq!(expected_value, column_index_min_bytes); |
| assert_eq!(expected_value, column_index_max_bytes); |
| |
| // ensure bytes weren't truncated for statistics |
| let stats = r.metadata.statistics().unwrap(); |
| if let Statistics::FixedLenByteArray(stats) = stats { |
| let stats_min_bytes = stats.min_bytes_opt().unwrap(); |
| let stats_max_bytes = stats.max_bytes_opt().unwrap(); |
| assert_eq!(expected_value, stats_min_bytes); |
| assert_eq!(expected_value, stats_max_bytes); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray"); |
| } |
| } |
| |
| #[test] |
| fn test_statistics_truncating_byte_array_default() { |
| let page_writer = get_test_page_writer(); |
| |
| // The default truncate length is 64 bytes |
| let props = WriterProperties::builder().build().into(); |
| let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props); |
| |
| let mut data = vec![ByteArray::default(); 1]; |
| data[0].set_data(Bytes::from(String::from( |
| "This string is longer than 64 bytes, so it will almost certainly be truncated.", |
| ))); |
| writer.write_batch(&data, None, None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| assert_eq!(1, r.rows_written); |
| |
| let stats = r.metadata.statistics().expect("statistics"); |
| if let Statistics::ByteArray(_stats) = stats { |
| let min_value = _stats.min_opt().unwrap(); |
| let max_value = _stats.max_opt().unwrap(); |
| |
| assert!(!_stats.min_is_exact()); |
| assert!(!_stats.max_is_exact()); |
| |
| let expected_len = 64; |
| assert_eq!(min_value.len(), expected_len); |
| assert_eq!(max_value.len(), expected_len); |
| |
| let expected_min = |
| "This string is longer than 64 bytes, so it will almost certainly".as_bytes(); |
| assert_eq!(expected_min, min_value.as_bytes()); |
| // note the max value is different from the min value: the last byte is incremented |
| let expected_max = |
| "This string is longer than 64 bytes, so it will almost certainlz".as_bytes(); |
| assert_eq!(expected_max, max_value.as_bytes()); |
| } else { |
| panic!("expecting Statistics::ByteArray"); |
| } |
| } |
| |
| #[test] |
| fn test_statistics_truncating_byte_array() { |
| let page_writer = get_test_page_writer(); |
| |
| const TEST_TRUNCATE_LENGTH: usize = 1; |
| |
| // Truncate values at 1 byte |
| let builder = |
| WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); |
| let props = Arc::new(builder.build()); |
| let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props); |
| |
| let mut data = vec![ByteArray::default(); 1]; |
| // This is the expected min value |
| data[0].set_data(Bytes::from(String::from("Blart Versenwald III"))); |
| |
| writer.write_batch(&data, None, None).unwrap(); |
| |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| assert_eq!(1, r.rows_written); |
| |
| let stats = r.metadata.statistics().expect("statistics"); |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::ByteArray(_stats) = stats { |
| let min_value = _stats.min_opt().unwrap(); |
| let max_value = _stats.max_opt().unwrap(); |
| |
| assert!(!_stats.min_is_exact()); |
| assert!(!_stats.max_is_exact()); |
| |
| assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); |
| assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); |
| |
| assert_eq!("B".as_bytes(), min_value.as_bytes()); |
| assert_eq!("C".as_bytes(), max_value.as_bytes()); |
| } else { |
| panic!("expecting Statistics::ByteArray"); |
| } |
| } |
| |
| #[test] |
| fn test_statistics_truncating_fixed_len_byte_array() { |
| let page_writer = get_test_page_writer(); |
| |
| const TEST_TRUNCATE_LENGTH: usize = 1; |
| |
| // Truncate values at 1 byte |
| let builder = |
| WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH)); |
| let props = Arc::new(builder.build()); |
| let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); |
| |
| let mut data = vec![FixedLenByteArray::default(); 1]; |
| |
| const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654; |
| const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes(); |
| |
| const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals |
| const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] = |
| [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0]; |
| |
| // This is the expected min value |
| data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice())); |
| |
| writer.write_batch(&data, None, None).unwrap(); |
| |
| writer.flush_data_pages().unwrap(); |
| |
| let r = writer.close().unwrap(); |
| |
| assert_eq!(1, r.rows_written); |
| |
| let stats = r.metadata.statistics().expect("statistics"); |
| assert_eq!(stats.null_count_opt(), Some(0)); |
| assert_eq!(stats.distinct_count_opt(), None); |
| if let Statistics::FixedLenByteArray(_stats) = stats { |
| let min_value = _stats.min_opt().unwrap(); |
| let max_value = _stats.max_opt().unwrap(); |
| |
| assert!(!_stats.min_is_exact()); |
| assert!(!_stats.max_is_exact()); |
| |
| assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH); |
| assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH); |
| |
| assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes()); |
| assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes()); |
| |
| let reconstructed_min = i128::from_be_bytes([ |
| min_value.as_bytes()[0], |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| ]); |
| |
| let reconstructed_max = i128::from_be_bytes([ |
| max_value.as_bytes()[0], |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| 0, |
| ]); |
| |
| // check that the inner value is correctly bounded by the min/max |
| println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}"); |
| assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE); |
| println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}"); |
| assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE); |
| } else { |
| panic!("expecting Statistics::FixedLenByteArray"); |
| } |
| } |
| |
| #[test] |
| fn test_send() { |
| fn test<T: Send>() {} |
| test::<ColumnWriterImpl<Int32Type>>(); |
| } |
| |
| #[test] |
| fn test_increment() { |
| let v = increment(vec![0, 0, 0]).unwrap(); |
| assert_eq!(&v, &[0, 0, 1]); |
| |
| // Handle overflow |
| let v = increment(vec![0, 255, 255]).unwrap(); |
| assert_eq!(&v, &[1, 0, 0]); |
| |
| // Return `None` if all bytes are u8::MAX |
| let v = increment(vec![255, 255, 255]); |
| assert!(v.is_none()); |
| } |
| |
| #[test] |
| fn test_increment_utf8() { |
| let test_inc = |o: &str, expected: &str| { |
| if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) { |
| // Got the expected result... |
| assert_eq!(v, expected); |
| // and it's greater than the original string |
| assert!(*v > *o); |
| // Also show that BinaryArray level comparison works here |
| let mut greater = ByteArray::new(); |
| greater.set_data(Bytes::from(v)); |
| let mut original = ByteArray::new(); |
| original.set_data(Bytes::from(o.as_bytes().to_vec())); |
| assert!(greater > original); |
| } else { |
| panic!("Expected incremented UTF8 string to also be valid."); |
| } |
| }; |
| |
| // Basic ASCII case |
| test_inc("hello", "hellp"); |
| |
| // 1-byte ending in max 1-byte |
| test_inc("a\u{7f}", "b"); |
| |
| // 1-byte max should not truncate as it would need 2-byte code points |
| assert!(increment_utf8("\u{7f}\u{7f}").is_none()); |
| |
| // UTF8 string |
| test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝"); |
| |
| // 2-byte without overflow |
| test_inc("éééé", "éééê"); |
| |
| // 2-byte that overflows lowest byte |
| test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}"); |
| |
| // 2-byte ending in max 2-byte |
| test_inc("a\u{7ff}", "b"); |
| |
| // Max 2-byte should not truncate as it would need 3-byte code points |
| assert!(increment_utf8("\u{7ff}\u{7ff}").is_none()); |
| |
| // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these |
| // characters should render right to left). |
| test_inc("ࠀࠀ", "ࠀࠁ"); |
| |
| // 3-byte ending in max 3-byte |
| test_inc("a\u{ffff}", "b"); |
| |
| // Max 3-byte should not truncate as it would need 4-byte code points |
| assert!(increment_utf8("\u{ffff}\u{ffff}").is_none()); |
| |
| // 4-byte without overflow |
| test_inc("𐀀𐀀", "𐀀𐀁"); |
| |
| // 4-byte ending in max unicode |
| test_inc("a\u{10ffff}", "b"); |
| |
| // Max 4-byte should not truncate |
| assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none()); |
| |
| // Skip over surrogate pair range (0xD800..=0xDFFF) |
| //test_inc("a\u{D7FF}", "a\u{e000}"); |
| test_inc("a\u{D7FF}", "b"); |
| } |
| |
| #[test] |
| fn test_truncate_utf8() { |
| // No-op |
| let data = "❤️🧡💛💚💙💜"; |
| let r = truncate_utf8(data, data.len()).unwrap(); |
| assert_eq!(r.len(), data.len()); |
| assert_eq!(&r, data.as_bytes()); |
| |
| // We slice it away from the UTF8 boundary |
| let r = truncate_utf8(data, 13).unwrap(); |
| assert_eq!(r.len(), 10); |
| assert_eq!(&r, "❤️🧡".as_bytes()); |
| |
| // One multi-byte code point, and a length shorter than it, so we can't slice it |
| let r = truncate_utf8("\u{0836}", 1); |
| assert!(r.is_none()); |
| |
| // Test truncate and increment for max bounds on UTF-8 statistics |
| // 7-bit (i.e. ASCII) |
| let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap(); |
| assert_eq!(&r, "yyyyyyyz".as_bytes()); |
| |
| // 2-byte without overflow |
| let r = truncate_and_increment_utf8("ééééé", 7).unwrap(); |
| assert_eq!(&r, "ééê".as_bytes()); |
| |
| // 2-byte that overflows lowest byte |
| let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap(); |
| assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes()); |
| |
| // max 2-byte should not truncate as it would need 3-byte code points |
| let r = truncate_and_increment_utf8("߿߿߿߿߿", 8); |
| assert!(r.is_none()); |
| |
| // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these |
| // characters should render right to left). |
| let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap(); |
| assert_eq!(&r, "ࠀࠁ".as_bytes()); |
| |
| // max 3-byte should not truncate as it would need 4-byte code points |
| let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8); |
| assert!(r.is_none()); |
| |
| // 4-byte without overflow |
| let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap(); |
| assert_eq!(&r, "𐀀𐀁".as_bytes()); |
| |
| // max 4-byte should not truncate |
| let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8); |
| assert!(r.is_none()); |
| } |
| |
| #[test] |
| // Check fallback truncation of statistics that should be UTF-8, but aren't |
| // (see https://github.com/apache/arrow-rs/pull/6870). |
| fn test_byte_array_truncate_invalid_utf8_statistics() { |
| let message_type = " |
| message test_schema { |
| OPTIONAL BYTE_ARRAY a (UTF8); |
| } |
| "; |
| let schema = Arc::new(parse_message_type(message_type).unwrap()); |
| |
| // Create Vec<ByteArray> containing non-UTF8 bytes |
| let data = vec![ByteArray::from(vec![128u8; 32]); 7]; |
| let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1]; |
| let file: File = tempfile::tempfile().unwrap(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::Chunk) |
| .set_statistics_truncate_length(Some(8)) |
| .build(), |
| ); |
| |
| let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); |
| let mut row_group_writer = writer.next_row_group().unwrap(); |
| |
| let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); |
| col_writer |
| .typed::<ByteArrayType>() |
| .write_batch(&data, Some(&def_levels), None) |
| .unwrap(); |
| col_writer.close().unwrap(); |
| row_group_writer.close().unwrap(); |
| let file_metadata = writer.close().unwrap(); |
| let stats = file_metadata.row_group(0).column(0).statistics().unwrap(); |
| assert!(!stats.max_is_exact()); |
| // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should |
| // be incremented by 1. |
| assert_eq!( |
| stats.max_bytes_opt().map(|v| v.to_vec()), |
| Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec()) |
| ); |
| } |
| |
| #[test] |
| fn test_increment_max_binary_chars() { |
| let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]); |
| assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]); |
| |
| let incremented = increment(vec![0xFF, 0xFF, 0xFF]); |
| assert!(incremented.is_none()) |
| } |
| |
| #[test] |
| fn test_no_column_index_when_stats_disabled() { |
| // https://github.com/apache/arrow-rs/issues/6010 |
| // Test that column index is not created/written for all-nulls column when page |
| // statistics are disabled. |
| let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::None) |
| .build(), |
| ); |
| let column_writer = get_column_writer(descr, props, get_test_page_writer()); |
| let mut writer = get_typed_column_writer::<Int32Type>(column_writer); |
| |
| let data = Vec::new(); |
| let def_levels = vec![0; 10]; |
| writer.write_batch(&data, Some(&def_levels), None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let column_close_result = writer.close().unwrap(); |
| assert!(column_close_result.offset_index.is_some()); |
| assert!(column_close_result.column_index.is_none()); |
| } |
| |
| #[test] |
| fn test_no_offset_index_when_disabled() { |
| // Test that offset indexes can be disabled |
| let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::None) |
| .set_offset_index_disabled(true) |
| .build(), |
| ); |
| let column_writer = get_column_writer(descr, props, get_test_page_writer()); |
| let mut writer = get_typed_column_writer::<Int32Type>(column_writer); |
| |
| let data = Vec::new(); |
| let def_levels = vec![0; 10]; |
| writer.write_batch(&data, Some(&def_levels), None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let column_close_result = writer.close().unwrap(); |
| assert!(column_close_result.offset_index.is_none()); |
| assert!(column_close_result.column_index.is_none()); |
| } |
| |
| #[test] |
| fn test_offset_index_overridden() { |
| // Test that offset indexes are not disabled when gathering page statistics |
| let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_statistics_enabled(EnabledStatistics::Page) |
| .set_offset_index_disabled(true) |
| .build(), |
| ); |
| let column_writer = get_column_writer(descr, props, get_test_page_writer()); |
| let mut writer = get_typed_column_writer::<Int32Type>(column_writer); |
| |
| let data = Vec::new(); |
| let def_levels = vec![0; 10]; |
| writer.write_batch(&data, Some(&def_levels), None).unwrap(); |
| writer.flush_data_pages().unwrap(); |
| |
| let column_close_result = writer.close().unwrap(); |
| assert!(column_close_result.offset_index.is_some()); |
| assert!(column_close_result.column_index.is_some()); |
| } |
| |
| #[test] |
| fn test_boundary_order() -> Result<()> { |
| let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0)); |
| // min max both ascending |
| let column_close_result = write_multiple_pages::<Int32Type>( |
| &descr, |
| &[ |
| &[Some(-10), Some(10)], |
| &[Some(-5), Some(11)], |
| &[None], |
| &[Some(-5), Some(11)], |
| ], |
| )?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); |
| |
| // min max both descending |
| let column_close_result = write_multiple_pages::<Int32Type>( |
| &descr, |
| &[ |
| &[Some(10), Some(11)], |
| &[Some(5), Some(11)], |
| &[None], |
| &[Some(-5), Some(0)], |
| ], |
| )?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING)); |
| |
| // min max both equal |
| let column_close_result = write_multiple_pages::<Int32Type>( |
| &descr, |
| &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]], |
| )?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); |
| |
| // only nulls |
| let column_close_result = |
| write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); |
| |
| // one page |
| let column_close_result = |
| write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); |
| |
| // one non-null page |
| let column_close_result = |
| write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING)); |
| |
| // min max both unordered |
| let column_close_result = write_multiple_pages::<Int32Type>( |
| &descr, |
| &[ |
| &[Some(10), Some(11)], |
| &[Some(11), Some(16)], |
| &[None], |
| &[Some(-5), Some(0)], |
| ], |
| )?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); |
| |
| // min max both ordered in different orders |
| let column_close_result = write_multiple_pages::<Int32Type>( |
| &descr, |
| &[ |
| &[Some(1), Some(9)], |
| &[Some(2), Some(8)], |
| &[None], |
| &[Some(3), Some(7)], |
| ], |
| )?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_boundary_order_logical_type() -> Result<()> { |
| // ensure that logical types account for different sort order than underlying |
| // physical type representation |
| let f16_descr = Arc::new(get_test_float16_column_descr(1, 0)); |
| let fba_descr = { |
| let tpe = SchemaType::primitive_type_builder( |
| "col", |
| FixedLenByteArrayType::get_physical_type(), |
| ) |
| .with_length(2) |
| .build()?; |
| Arc::new(ColumnDescriptor::new( |
| Arc::new(tpe), |
| 1, |
| 0, |
| ColumnPath::from("col"), |
| )) |
| }; |
| |
| let values: &[&[Option<FixedLenByteArray>]] = &[ |
| &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))], |
| &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))], |
| &[Some(FixedLenByteArray::from(ByteArray::from( |
| f16::NEG_ZERO, |
| )))], |
| &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))], |
| ]; |
| |
| // f16 descending |
| let column_close_result = |
| write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING)); |
| |
| // same bytes, but fba unordered |
| let column_close_result = |
| write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?; |
| let boundary_order = column_close_result |
| .column_index |
| .unwrap() |
| .get_boundary_order(); |
| assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED)); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_interval_stats_should_not_have_min_max() { |
| let input = [ |
| vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], |
| vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1], |
| vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2], |
| ] |
| .into_iter() |
| .map(|s| ByteArray::from(s).into()) |
| .collect::<Vec<_>>(); |
| |
| let page_writer = get_test_page_writer(); |
| let mut writer = get_test_interval_column_writer(page_writer); |
| writer.write_batch(&input, None, None).unwrap(); |
| |
| let metadata = writer.close().unwrap().metadata; |
| let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() { |
| stats.clone() |
| } else { |
| panic!("metadata missing statistics"); |
| }; |
| assert!(stats.min_bytes_opt().is_none()); |
| assert!(stats.max_bytes_opt().is_none()); |
| } |
| |
| #[test] |
| #[cfg(feature = "arrow")] |
| fn test_column_writer_get_estimated_total_bytes() { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| assert_eq!(writer.get_estimated_total_bytes(), 0); |
| |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| writer.add_data_page().unwrap(); |
| let size_with_one_page = writer.get_estimated_total_bytes(); |
| assert_eq!(size_with_one_page, 20); |
| |
| writer.write_batch(&[5, 6, 7, 8], None, None).unwrap(); |
| writer.add_data_page().unwrap(); |
| let size_with_two_pages = writer.get_estimated_total_bytes(); |
| // different pages have different compressed lengths |
| assert_eq!(size_with_two_pages, 20 + 21); |
| } |
| |
| fn write_multiple_pages<T: DataType>( |
| column_descr: &Arc<ColumnDescriptor>, |
| pages: &[&[Option<T::T>]], |
| ) -> Result<ColumnCloseResult> { |
| let column_writer = get_column_writer( |
| column_descr.clone(), |
| Default::default(), |
| get_test_page_writer(), |
| ); |
| let mut writer = get_typed_column_writer::<T>(column_writer); |
| |
| for &page in pages { |
| let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>(); |
| let def_levels = page |
| .iter() |
| .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 }) |
| .collect::<Vec<_>>(); |
| writer.write_batch(&values, Some(&def_levels), None)?; |
| writer.flush_data_pages()?; |
| } |
| |
| writer.close() |
| } |
| |
| /// Performs write-read roundtrip with randomly generated values and levels. |
| /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write |
| /// for a column. |
| fn column_roundtrip_random<T: DataType>( |
| props: WriterProperties, |
| max_size: usize, |
| min_value: T::T, |
| max_value: T::T, |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) where |
| T::T: PartialOrd + SampleUniform + Copy, |
| { |
| let mut num_values: usize = 0; |
| |
| let mut buf: Vec<i16> = Vec::new(); |
| let def_levels = if max_def_level > 0 { |
| random_numbers_range(max_size, 0, max_def_level + 1, &mut buf); |
| for &dl in &buf[..] { |
| if dl == max_def_level { |
| num_values += 1; |
| } |
| } |
| Some(&buf[..]) |
| } else { |
| num_values = max_size; |
| None |
| }; |
| |
| let mut buf: Vec<i16> = Vec::new(); |
| let rep_levels = if max_rep_level > 0 { |
| random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf); |
| buf[0] = 0; // Must start on record boundary |
| Some(&buf[..]) |
| } else { |
| None |
| }; |
| |
| let mut values: Vec<T::T> = Vec::new(); |
| random_numbers_range(num_values, min_value, max_value, &mut values); |
| |
| column_roundtrip::<T>(props, &values[..], def_levels, rep_levels); |
| } |
| |
| /// Performs write-read roundtrip and asserts written values and levels. |
| fn column_roundtrip<T: DataType>( |
| props: WriterProperties, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| ) { |
| let mut file = tempfile::tempfile().unwrap(); |
| let mut write = TrackedWrite::new(&mut file); |
| let page_writer = Box::new(SerializedPageWriter::new(&mut write)); |
| |
| let max_def_level = match def_levels { |
| Some(buf) => *buf.iter().max().unwrap_or(&0i16), |
| None => 0i16, |
| }; |
| |
| let max_rep_level = match rep_levels { |
| Some(buf) => *buf.iter().max().unwrap_or(&0i16), |
| None => 0i16, |
| }; |
| |
| let mut max_batch_size = values.len(); |
| if let Some(levels) = def_levels { |
| max_batch_size = max_batch_size.max(levels.len()); |
| } |
| if let Some(levels) = rep_levels { |
| max_batch_size = max_batch_size.max(levels.len()); |
| } |
| |
| let mut writer = |
| get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props)); |
| |
| let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap(); |
| assert_eq!(values_written, values.len()); |
| let result = writer.close().unwrap(); |
| |
| drop(write); |
| |
| let props = ReaderProperties::builder() |
| .set_backward_compatible_lz4(false) |
| .build(); |
| let page_reader = Box::new( |
| SerializedPageReader::new_with_properties( |
| Arc::new(file), |
| &result.metadata, |
| result.rows_written as usize, |
| None, |
| Arc::new(props), |
| ) |
| .unwrap(), |
| ); |
| let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level); |
| |
| let mut actual_values = Vec::with_capacity(max_batch_size); |
| let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size)); |
| let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size)); |
| |
| let (_, values_read, levels_read) = reader |
| .read_records( |
| max_batch_size, |
| actual_def_levels.as_mut(), |
| actual_rep_levels.as_mut(), |
| &mut actual_values, |
| ) |
| .unwrap(); |
| |
| // Assert values, definition and repetition levels. |
| |
| assert_eq!(&actual_values[..values_read], values); |
| match actual_def_levels { |
| Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels), |
| None => assert_eq!(None, def_levels), |
| } |
| match actual_rep_levels { |
| Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels), |
| None => assert_eq!(None, rep_levels), |
| } |
| |
| // Assert written rows. |
| |
| if let Some(levels) = actual_rep_levels { |
| let mut actual_rows_written = 0; |
| for l in levels { |
| if l == 0 { |
| actual_rows_written += 1; |
| } |
| } |
| assert_eq!(actual_rows_written, result.rows_written); |
| } else if actual_def_levels.is_some() { |
| assert_eq!(levels_read as u64, result.rows_written); |
| } else { |
| assert_eq!(values_read as u64, result.rows_written); |
| } |
| } |
| |
| /// Performs write of provided values and returns column metadata of those values. |
| /// Used to test encoding support for column writer. |
| fn column_write_and_get_metadata<T: DataType>( |
| props: WriterProperties, |
| values: &[T::T], |
| ) -> ColumnChunkMetaData { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(props); |
| let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props); |
| writer.write_batch(values, None, None).unwrap(); |
| writer.close().unwrap().metadata |
| } |
| |
| // Helper function to more compactly create a PageEncodingStats struct. |
| fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats { |
| PageEncodingStats { |
| page_type, |
| encoding, |
| count, |
| } |
| } |
| |
| // Function to use in tests for EncodingWriteSupport. This checks that dictionary |
| // offset and encodings to make sure that column writer uses provided by trait |
| // encodings. |
| fn check_encoding_write_support<T: DataType>( |
| version: WriterVersion, |
| dict_enabled: bool, |
| data: &[T::T], |
| dictionary_page_offset: Option<i64>, |
| encodings: &[Encoding], |
| page_encoding_stats: &[PageEncodingStats], |
| ) { |
| let props = WriterProperties::builder() |
| .set_writer_version(version) |
| .set_dictionary_enabled(dict_enabled) |
| .build(); |
| let meta = column_write_and_get_metadata::<T>(props, data); |
| assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset); |
| assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings); |
| assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats); |
| } |
| |
| /// Returns column writer. |
| fn get_test_column_writer<'a, T: DataType>( |
| page_writer: Box<dyn PageWriter + 'a>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| props: WriterPropertiesPtr, |
| ) -> ColumnWriterImpl<'a, T> { |
| let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level)); |
| let column_writer = get_column_writer(descr, props, page_writer); |
| get_typed_column_writer::<T>(column_writer) |
| } |
| |
| /// Returns column reader. |
| fn get_test_column_reader<T: DataType>( |
| page_reader: Box<dyn PageReader>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnReaderImpl<T> { |
| let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level)); |
| let column_reader = get_column_reader(descr, page_reader); |
| get_typed_column_reader::<T>(column_reader) |
| } |
| |
| /// Returns descriptor for primitive column. |
| fn get_test_column_descr<T: DataType>( |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type()) |
| // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type, |
| // it should be no-op for other types |
| .with_length(1) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) |
| } |
| |
| /// Returns page writer that collects pages without serializing them. |
| fn get_test_page_writer() -> Box<dyn PageWriter> { |
| Box::new(TestPageWriter {}) |
| } |
| |
| struct TestPageWriter {} |
| |
| impl PageWriter for TestPageWriter { |
| fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> { |
| let mut res = PageWriteSpec::new(); |
| res.page_type = page.page_type(); |
| res.uncompressed_size = page.uncompressed_size(); |
| res.compressed_size = page.compressed_size(); |
| res.num_values = page.num_values(); |
| res.offset = 0; |
| res.bytes_written = page.data().len() as u64; |
| Ok(res) |
| } |
| |
| fn close(&mut self) -> Result<()> { |
| Ok(()) |
| } |
| } |
| |
| /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics. |
| fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics { |
| let page_writer = get_test_page_writer(); |
| let props = Default::default(); |
| let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props); |
| writer.write_batch(values, None, None).unwrap(); |
| |
| let metadata = writer.close().unwrap().metadata; |
| if let Some(stats) = metadata.statistics() { |
| stats.clone() |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| /// Returns Decimals column writer. |
| fn get_test_decimals_column_writer<T: DataType>( |
| page_writer: Box<dyn PageWriter>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| props: WriterPropertiesPtr, |
| ) -> ColumnWriterImpl<'static, T> { |
| let descr = Arc::new(get_test_decimals_column_descr::<T>( |
| max_def_level, |
| max_rep_level, |
| )); |
| let column_writer = get_column_writer(descr, props, page_writer); |
| get_typed_column_writer::<T>(column_writer) |
| } |
| |
| /// Returns descriptor for Decimal type with primitive column. |
| fn get_test_decimals_column_descr<T: DataType>( |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type()) |
| .with_length(16) |
| .with_logical_type(Some(LogicalType::Decimal { |
| scale: 2, |
| precision: 3, |
| })) |
| .with_scale(2) |
| .with_precision(3) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) |
| } |
| |
| fn float16_statistics_roundtrip( |
| values: &[FixedLenByteArray], |
| ) -> ValueStatistics<FixedLenByteArray> { |
| let page_writer = get_test_page_writer(); |
| let mut writer = get_test_float16_column_writer(page_writer, Default::default()); |
| writer.write_batch(values, None, None).unwrap(); |
| |
| let metadata = writer.close().unwrap().metadata; |
| if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() { |
| stats.clone() |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| fn get_test_float16_column_writer( |
| page_writer: Box<dyn PageWriter>, |
| props: WriterPropertiesPtr, |
| ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { |
| let descr = Arc::new(get_test_float16_column_descr(0, 0)); |
| let column_writer = get_column_writer(descr, props, page_writer); |
| get_typed_column_writer::<FixedLenByteArrayType>(column_writer) |
| } |
| |
| fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = |
| SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type()) |
| .with_length(2) |
| .with_logical_type(Some(LogicalType::Float16)) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) |
| } |
| |
| fn get_test_interval_column_writer( |
| page_writer: Box<dyn PageWriter>, |
| ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { |
| let descr = Arc::new(get_test_interval_column_descr()); |
| let column_writer = get_column_writer(descr, Default::default(), page_writer); |
| get_typed_column_writer::<FixedLenByteArrayType>(column_writer) |
| } |
| |
| fn get_test_interval_column_descr() -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = |
| SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type()) |
| .with_length(12) |
| .with_converted_type(ConvertedType::INTERVAL) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), 0, 0, path) |
| } |
| |
| /// Returns column writer for UINT32 Column provided as ConvertedType only |
| fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>( |
| page_writer: Box<dyn PageWriter + 'a>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| props: WriterPropertiesPtr, |
| ) -> ColumnWriterImpl<'a, T> { |
| let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>( |
| max_def_level, |
| max_rep_level, |
| )); |
| let column_writer = get_column_writer(descr, props, page_writer); |
| get_typed_column_writer::<T>(column_writer) |
| } |
| |
| /// Returns column descriptor for UINT32 Column provided as ConvertedType only |
| fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>( |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type()) |
| .with_converted_type(ConvertedType::UINT_32) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) |
| } |
| |
| #[test] |
| fn test_page_v2_snappy_compression_fallback() { |
| // Test that PageV2 sets is_compressed to false when Snappy compression increases data size |
| let page_writer = TestPageWriter {}; |
| |
| // Create WriterProperties with PageV2 and Snappy compression |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| // Disable dictionary to ensure data is written directly |
| .set_dictionary_enabled(false) |
| .set_compression(Compression::SNAPPY) |
| .build(); |
| |
| let mut column_writer = |
| get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props)); |
| |
| // Create small, simple data that Snappy compression will likely increase in size |
| // due to compression overhead for very small data |
| let values = vec![ByteArray::from("a")]; |
| |
| column_writer.write_batch(&values, None, None).unwrap(); |
| |
| let result = column_writer.close().unwrap(); |
| assert_eq!( |
| result.metadata.uncompressed_size(), |
| result.metadata.compressed_size() |
| ); |
| } |
| } |