| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains column writer API. |
| use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc}; |
| |
| use crate::basic::{Compression, Encoding, LogicalType, PageType, Type}; |
| use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; |
| use crate::compression::{create_codec, Codec}; |
| use crate::data_type::private::ParquetValueType; |
| use crate::data_type::AsBytes; |
| use crate::data_type::*; |
| use crate::encodings::{ |
| encoding::{get_encoder, DictEncoder, Encoder}, |
| levels::{max_buffer_size, LevelEncoder}, |
| }; |
| use crate::errors::{ParquetError, Result}; |
| use crate::file::statistics::Statistics; |
| use crate::file::{ |
| metadata::ColumnChunkMetaData, |
| properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, |
| }; |
| use crate::schema::types::ColumnDescPtr; |
| use crate::util::bit_util::FromBytes; |
| use crate::util::memory::{ByteBufferPtr, MemTracker}; |
| |
| /// Column writer for a Parquet type. |
| pub enum ColumnWriter { |
| BoolColumnWriter(ColumnWriterImpl<BoolType>), |
| Int32ColumnWriter(ColumnWriterImpl<Int32Type>), |
| Int64ColumnWriter(ColumnWriterImpl<Int64Type>), |
| Int96ColumnWriter(ColumnWriterImpl<Int96Type>), |
| FloatColumnWriter(ColumnWriterImpl<FloatType>), |
| DoubleColumnWriter(ColumnWriterImpl<DoubleType>), |
| ByteArrayColumnWriter(ColumnWriterImpl<ByteArrayType>), |
| FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>), |
| } |
| |
| pub enum Level { |
| Page, |
| Column, |
| } |
| |
| macro_rules! gen_stats_section { |
| ($physical_ty: ty, $stat_fn: ident, $min: ident, $max: ident, $distinct: ident, $nulls: ident) => {{ |
| let min = $min.as_ref().and_then(|v| { |
| Some(read_num_bytes!( |
| $physical_ty, |
| v.as_bytes().len(), |
| &v.as_bytes() |
| )) |
| }); |
| let max = $max.as_ref().and_then(|v| { |
| Some(read_num_bytes!( |
| $physical_ty, |
| v.as_bytes().len(), |
| &v.as_bytes() |
| )) |
| }); |
| Statistics::$stat_fn(min, max, $distinct, $nulls, false) |
| }}; |
| } |
| |
| /// Gets a specific column writer corresponding to column descriptor `descr`. |
| pub fn get_column_writer( |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| page_writer: Box<dyn PageWriter>, |
| ) -> ColumnWriter { |
| match descr.physical_type() { |
| Type::BOOLEAN => ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::INT32 => ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::INT64 => ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::INT96 => ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::FLOAT => ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::DOUBLE => ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::BYTE_ARRAY => ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new( |
| descr, |
| props, |
| page_writer, |
| )), |
| Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter( |
| ColumnWriterImpl::new(descr, props, page_writer), |
| ), |
| } |
| } |
| |
| /// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of |
| /// non-generic type to a generic column writer type `ColumnWriterImpl`. |
| /// |
| /// Panics if actual enum value for `col_writer` does not match the type `T`. |
| pub fn get_typed_column_writer<T: DataType>( |
| col_writer: ColumnWriter, |
| ) -> ColumnWriterImpl<T> { |
| T::get_column_writer(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Similar to `get_typed_column_writer` but returns a reference. |
| pub fn get_typed_column_writer_ref<T: DataType>( |
| col_writer: &ColumnWriter, |
| ) -> &ColumnWriterImpl<T> { |
| T::get_column_writer_ref(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Similar to `get_typed_column_writer` but returns a reference. |
| pub fn get_typed_column_writer_mut<T: DataType>( |
| col_writer: &mut ColumnWriter, |
| ) -> &mut ColumnWriterImpl<T> { |
| T::get_column_writer_mut(col_writer).unwrap_or_else(|| { |
| panic!( |
| "Failed to convert column writer into a typed column writer for `{}` type", |
| T::get_physical_type() |
| ) |
| }) |
| } |
| |
| /// Typed column writer for a primitive column. |
| pub struct ColumnWriterImpl<T: DataType> { |
| // Column writer properties |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| page_writer: Box<dyn PageWriter>, |
| has_dictionary: bool, |
| dict_encoder: Option<DictEncoder<T>>, |
| encoder: Box<dyn Encoder<T>>, |
| codec: Compression, |
| compressor: Option<Box<dyn Codec>>, |
| // Metrics per page |
| num_buffered_values: u32, |
| num_buffered_encoded_values: u32, |
| num_buffered_rows: u32, |
| min_page_value: Option<T::T>, |
| max_page_value: Option<T::T>, |
| num_page_nulls: u64, |
| page_distinct_count: Option<u64>, |
| // Metrics per column writer |
| total_bytes_written: u64, |
| total_rows_written: u64, |
| total_uncompressed_size: u64, |
| total_compressed_size: u64, |
| total_num_values: u64, |
| dictionary_page_offset: Option<u64>, |
| data_page_offset: Option<u64>, |
| min_column_value: Option<T::T>, |
| max_column_value: Option<T::T>, |
| num_column_nulls: u64, |
| column_distinct_count: Option<u64>, |
| // Reused buffers |
| def_levels_sink: Vec<i16>, |
| rep_levels_sink: Vec<i16>, |
| data_pages: VecDeque<CompressedPage>, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> ColumnWriterImpl<T> { |
| pub fn new( |
| descr: ColumnDescPtr, |
| props: WriterPropertiesPtr, |
| page_writer: Box<dyn PageWriter>, |
| ) -> Self { |
| let codec = props.compression(descr.path()); |
| let compressor = create_codec(codec).unwrap(); |
| |
| // Optionally set dictionary encoder. |
| let dict_encoder = if props.dictionary_enabled(descr.path()) |
| && has_dictionary_support(T::get_physical_type(), &props) |
| { |
| Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new()))) |
| } else { |
| None |
| }; |
| |
| // Whether or not this column writer has a dictionary encoding. |
| let has_dictionary = dict_encoder.is_some(); |
| |
| // Set either main encoder or fallback encoder. |
| let fallback_encoder = get_encoder( |
| descr.clone(), |
| props |
| .encoding(descr.path()) |
| .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)), |
| Arc::new(MemTracker::new()), |
| ) |
| .unwrap(); |
| |
| Self { |
| descr, |
| props, |
| page_writer, |
| has_dictionary, |
| dict_encoder, |
| encoder: fallback_encoder, |
| codec, |
| compressor, |
| num_buffered_values: 0, |
| num_buffered_encoded_values: 0, |
| num_buffered_rows: 0, |
| total_bytes_written: 0, |
| total_rows_written: 0, |
| total_uncompressed_size: 0, |
| total_compressed_size: 0, |
| total_num_values: 0, |
| dictionary_page_offset: None, |
| data_page_offset: None, |
| def_levels_sink: vec![], |
| rep_levels_sink: vec![], |
| data_pages: VecDeque::new(), |
| min_page_value: None, |
| max_page_value: None, |
| num_page_nulls: 0, |
| page_distinct_count: None, |
| min_column_value: None, |
| max_column_value: None, |
| num_column_nulls: 0, |
| column_distinct_count: None, |
| _phantom: PhantomData, |
| } |
| } |
| |
| fn write_batch_internal( |
| &mut self, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| min: &Option<T::T>, |
| max: &Option<T::T>, |
| null_count: Option<u64>, |
| distinct_count: Option<u64>, |
| ) -> Result<usize> { |
| // We check for DataPage limits only after we have inserted the values. If a user |
| // writes a large number of values, the DataPage size can be well above the limit. |
| // |
| // The purpose of this chunking is to bound this. Even if a user writes large |
| // number of values, the chunking will ensure that we add data page at a |
| // reasonable pagesize limit. |
| |
| // TODO: find out why we don't account for size of levels when we estimate page |
| // size. |
| |
| // Find out the minimal length to prevent index out of bound errors. |
| let mut min_len = values.len(); |
| if let Some(levels) = def_levels { |
| min_len = cmp::min(min_len, levels.len()); |
| } |
| if let Some(levels) = rep_levels { |
| min_len = cmp::min(min_len, levels.len()); |
| } |
| |
| // Find out number of batches to process. |
| let write_batch_size = self.props.write_batch_size(); |
| let num_batches = min_len / write_batch_size; |
| |
| // Process pre-calculated statistics |
| match (min, max) { |
| (Some(min), Some(max)) => { |
| if self |
| .min_column_value |
| .as_ref() |
| .map_or(true, |v| self.compare_greater(v, min)) |
| { |
| self.min_column_value = Some(min.clone()); |
| } |
| if self |
| .max_column_value |
| .as_ref() |
| .map_or(true, |v| self.compare_greater(max, v)) |
| { |
| self.max_column_value = Some(max.clone()); |
| } |
| } |
| (None, Some(_)) | (Some(_), None) => { |
| panic!("min/max should be both set or both None") |
| } |
| (None, None) => {} |
| } |
| |
| if let Some(distinct) = distinct_count { |
| self.column_distinct_count = |
| Some(self.column_distinct_count.unwrap_or(0) + distinct); |
| } |
| |
| if let Some(nulls) = null_count { |
| self.num_column_nulls += nulls; |
| } |
| |
| let calculate_page_stats = (min.is_none() || max.is_none()) |
| && null_count.is_none() |
| && distinct_count.is_none(); |
| |
| let mut values_offset = 0; |
| let mut levels_offset = 0; |
| for _ in 0..num_batches { |
| values_offset += self.write_mini_batch( |
| &values[values_offset..values_offset + write_batch_size], |
| def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), |
| rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), |
| calculate_page_stats, |
| )?; |
| levels_offset += write_batch_size; |
| } |
| |
| values_offset += self.write_mini_batch( |
| &values[values_offset..], |
| def_levels.map(|lv| &lv[levels_offset..]), |
| rep_levels.map(|lv| &lv[levels_offset..]), |
| calculate_page_stats, |
| )?; |
| |
| // Return total number of values processed. |
| Ok(values_offset) |
| } |
| |
| /// Writes batch of values, definition levels and repetition levels. |
| /// Returns number of values processed (written). |
| /// |
| /// If definition and repetition levels are provided, we write fully those levels and |
| /// select how many values to write (this number will be returned), since number of |
| /// actual written values may be smaller than provided values. |
| /// |
| /// If only values are provided, then all values are written and the length of |
| /// of the values buffer is returned. |
| /// |
| /// Definition and/or repetition levels can be omitted, if values are |
| /// non-nullable and/or non-repeated. |
| pub fn write_batch( |
| &mut self, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| ) -> Result<usize> { |
| self.write_batch_internal( |
| values, def_levels, rep_levels, &None, &None, None, None, |
| ) |
| } |
| |
| /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do |
| /// not calculate page level statistics as this will defeat the purpose of speeding up the write |
| /// process with pre-calculated statistics. |
| pub fn write_batch_with_statistics( |
| &mut self, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| min: &Option<T::T>, |
| max: &Option<T::T>, |
| nulls_count: Option<u64>, |
| distinct_count: Option<u64>, |
| ) -> Result<usize> { |
| self.write_batch_internal( |
| values, |
| def_levels, |
| rep_levels, |
| min, |
| max, |
| nulls_count, |
| distinct_count, |
| ) |
| } |
| |
| /// Returns total number of bytes written by this column writer so far. |
| /// This value is also returned when column writer is closed. |
| pub fn get_total_bytes_written(&self) -> u64 { |
| self.total_bytes_written |
| } |
| |
| /// Returns total number of rows written by this column writer so far. |
| /// This value is also returned when column writer is closed. |
| pub fn get_total_rows_written(&self) -> u64 { |
| self.total_rows_written |
| } |
| |
| /// Finalises writes and closes the column writer. |
| /// Returns total bytes written, total rows written and column chunk metadata. |
| pub fn close(mut self) -> Result<(u64, u64, ColumnChunkMetaData)> { |
| if self.dict_encoder.is_some() { |
| self.write_dictionary_page()?; |
| } |
| self.flush_data_pages()?; |
| let metadata = self.write_column_metadata()?; |
| self.dict_encoder = None; |
| self.page_writer.close()?; |
| |
| Ok((self.total_bytes_written, self.total_rows_written, metadata)) |
| } |
| |
| /// Writes mini batch of values, definition and repetition levels. |
| /// This allows fine-grained processing of values and maintaining a reasonable |
| /// page size. |
| fn write_mini_batch( |
| &mut self, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| calculate_page_stats: bool, |
| ) -> Result<usize> { |
| let mut values_to_write = 0; |
| |
| // Check if number of definition levels is the same as number of repetition |
| // levels. |
| if let (Some(def), Some(rep)) = (def_levels, rep_levels) { |
| if def.len() != rep.len() { |
| return Err(general_err!( |
| "Inconsistent length of definition and repetition levels: {} != {}", |
| def.len(), |
| rep.len() |
| )); |
| } |
| } |
| |
| // Process definition levels and determine how many values to write. |
| let num_values = if self.descr.max_def_level() > 0 { |
| let levels = def_levels.ok_or_else(|| { |
| general_err!( |
| "Definition levels are required, because max definition level = {}", |
| self.descr.max_def_level() |
| ) |
| })?; |
| |
| for &level in levels { |
| if level == self.descr.max_def_level() { |
| values_to_write += 1; |
| } else if calculate_page_stats { |
| self.num_page_nulls += 1 |
| } |
| } |
| |
| self.write_definition_levels(levels); |
| u32::try_from(levels.len()).unwrap() |
| } else { |
| values_to_write = values.len(); |
| u32::try_from(values_to_write).unwrap() |
| }; |
| |
| // Process repetition levels and determine how many rows we are about to process. |
| if self.descr.max_rep_level() > 0 { |
| // A row could contain more than one value. |
| let levels = rep_levels.ok_or_else(|| { |
| general_err!( |
| "Repetition levels are required, because max repetition level = {}", |
| self.descr.max_rep_level() |
| ) |
| })?; |
| |
| // Count the occasions where we start a new row |
| for &level in levels { |
| self.num_buffered_rows += (level == 0) as u32 |
| } |
| |
| self.write_repetition_levels(levels); |
| } else { |
| // Each value is exactly one row. |
| // Equals to the number of values, we count nulls as well. |
| self.num_buffered_rows += num_values; |
| } |
| |
| // Check that we have enough values to write. |
| let values_to_write = values.get(0..values_to_write).ok_or_else(|| { |
| general_err!( |
| "Expected to write {} values, but have only {}", |
| values_to_write, |
| values.len() |
| ) |
| })?; |
| |
| if calculate_page_stats { |
| for val in values_to_write { |
| self.update_page_min_max(val); |
| } |
| } |
| |
| self.write_values(values_to_write)?; |
| |
| self.num_buffered_values += num_values; |
| self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap(); |
| |
| if self.should_add_data_page() { |
| self.add_data_page(calculate_page_stats)?; |
| } |
| |
| if self.should_dict_fallback() { |
| self.dict_fallback()?; |
| } |
| |
| Ok(values_to_write.len()) |
| } |
| |
| #[inline] |
| fn write_definition_levels(&mut self, def_levels: &[i16]) { |
| self.def_levels_sink.extend_from_slice(def_levels); |
| } |
| |
| #[inline] |
| fn write_repetition_levels(&mut self, rep_levels: &[i16]) { |
| self.rep_levels_sink.extend_from_slice(rep_levels); |
| } |
| |
| #[inline] |
| fn write_values(&mut self, values: &[T::T]) -> Result<()> { |
| match self.dict_encoder { |
| Some(ref mut encoder) => encoder.put(values), |
| None => self.encoder.put(values), |
| } |
| } |
| |
| /// Returns true if we need to fall back to non-dictionary encoding. |
| /// |
| /// We can only fall back if dictionary encoder is set and we have exceeded dictionary |
| /// size. |
| #[inline] |
| fn should_dict_fallback(&self) -> bool { |
| match self.dict_encoder { |
| Some(ref encoder) => { |
| encoder.dict_encoded_size() >= self.props.dictionary_pagesize_limit() |
| } |
| None => false, |
| } |
| } |
| |
| /// Returns true if there is enough data for a data page, false otherwise. |
| #[inline] |
| fn should_add_data_page(&self) -> bool { |
| match self.dict_encoder { |
| Some(ref encoder) => { |
| encoder.estimated_data_encoded_size() >= self.props.data_pagesize_limit() |
| } |
| None => { |
| self.encoder.estimated_data_encoded_size() |
| >= self.props.data_pagesize_limit() |
| } |
| } |
| } |
| |
| /// Performs dictionary fallback. |
| /// Prepares and writes dictionary and all data pages into page writer. |
| fn dict_fallback(&mut self) -> Result<()> { |
| // At this point we know that we need to fall back. |
| self.write_dictionary_page()?; |
| self.flush_data_pages()?; |
| self.dict_encoder = None; |
| Ok(()) |
| } |
| |
| /// Adds data page. |
| /// Data page is either buffered in case of dictionary encoding or written directly. |
| fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { |
| // Extract encoded values |
| let value_bytes = match self.dict_encoder { |
| Some(ref mut encoder) => encoder.write_indices()?, |
| None => self.encoder.flush_buffer()?, |
| }; |
| |
| // Select encoding based on current encoder and writer version (v1 or v2). |
| let encoding = if self.dict_encoder.is_some() { |
| self.props.dictionary_data_page_encoding() |
| } else { |
| self.encoder.encoding() |
| }; |
| |
| let max_def_level = self.descr.max_def_level(); |
| let max_rep_level = self.descr.max_rep_level(); |
| |
| // always update column NULL count, no matter if page stats are used |
| self.num_column_nulls += self.num_page_nulls; |
| |
| let page_statistics = if calculate_page_stat { |
| self.update_column_min_max(); |
| Some(self.make_page_statistics()) |
| } else { |
| None |
| }; |
| |
| let compressed_page = match self.props.writer_version() { |
| WriterVersion::PARQUET_1_0 => { |
| let mut buffer = vec![]; |
| |
| if max_rep_level > 0 { |
| buffer.extend_from_slice( |
| &self.encode_levels_v1( |
| Encoding::RLE, |
| &self.rep_levels_sink[..], |
| max_rep_level, |
| )?[..], |
| ); |
| } |
| |
| if max_def_level > 0 { |
| buffer.extend_from_slice( |
| &self.encode_levels_v1( |
| Encoding::RLE, |
| &self.def_levels_sink[..], |
| max_def_level, |
| )?[..], |
| ); |
| } |
| |
| buffer.extend_from_slice(value_bytes.data()); |
| let uncompressed_size = buffer.len(); |
| |
| if let Some(ref mut cmpr) = self.compressor { |
| let mut compressed_buf = Vec::with_capacity(value_bytes.data().len()); |
| cmpr.compress(&buffer[..], &mut compressed_buf)?; |
| buffer = compressed_buf; |
| } |
| |
| let data_page = Page::DataPage { |
| buf: ByteBufferPtr::new(buffer), |
| num_values: self.num_buffered_values, |
| encoding, |
| def_level_encoding: Encoding::RLE, |
| rep_level_encoding: Encoding::RLE, |
| statistics: page_statistics, |
| }; |
| |
| CompressedPage::new(data_page, uncompressed_size) |
| } |
| WriterVersion::PARQUET_2_0 => { |
| let mut rep_levels_byte_len = 0; |
| let mut def_levels_byte_len = 0; |
| let mut buffer = vec![]; |
| |
| if max_rep_level > 0 { |
| let levels = |
| self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level)?; |
| rep_levels_byte_len = levels.len(); |
| buffer.extend_from_slice(&levels[..]); |
| } |
| |
| if max_def_level > 0 { |
| let levels = |
| self.encode_levels_v2(&self.def_levels_sink[..], max_def_level)?; |
| def_levels_byte_len = levels.len(); |
| buffer.extend_from_slice(&levels[..]); |
| } |
| |
| let uncompressed_size = |
| rep_levels_byte_len + def_levels_byte_len + value_bytes.len(); |
| |
| // Data Page v2 compresses values only. |
| match self.compressor { |
| Some(ref mut cmpr) => { |
| cmpr.compress(value_bytes.data(), &mut buffer)?; |
| } |
| None => buffer.extend_from_slice(value_bytes.data()), |
| } |
| |
| let data_page = Page::DataPageV2 { |
| buf: ByteBufferPtr::new(buffer), |
| num_values: self.num_buffered_values, |
| encoding, |
| num_nulls: self.num_buffered_values |
| - self.num_buffered_encoded_values, |
| num_rows: self.num_buffered_rows, |
| def_levels_byte_len: def_levels_byte_len as u32, |
| rep_levels_byte_len: rep_levels_byte_len as u32, |
| is_compressed: self.compressor.is_some(), |
| statistics: page_statistics, |
| }; |
| |
| CompressedPage::new(data_page, uncompressed_size) |
| } |
| }; |
| |
| // Check if we need to buffer data page or flush it to the sink directly. |
| if self.dict_encoder.is_some() { |
| self.data_pages.push_back(compressed_page); |
| } else { |
| self.write_data_page(compressed_page)?; |
| } |
| |
| // Update total number of rows. |
| self.total_rows_written += self.num_buffered_rows as u64; |
| |
| // Reset state. |
| self.rep_levels_sink.clear(); |
| self.def_levels_sink.clear(); |
| self.num_buffered_values = 0; |
| self.num_buffered_encoded_values = 0; |
| self.num_buffered_rows = 0; |
| self.min_page_value = None; |
| self.max_page_value = None; |
| self.num_page_nulls = 0; |
| self.page_distinct_count = None; |
| |
| Ok(()) |
| } |
| |
| /// Finalises any outstanding data pages and flushes buffered data pages from |
| /// dictionary encoding into underlying sink. |
| #[inline] |
| fn flush_data_pages(&mut self) -> Result<()> { |
| // Write all outstanding data to a new page. |
| let calculate_page_stats = |
| self.min_page_value.is_some() && self.max_page_value.is_some(); |
| if self.num_buffered_values > 0 { |
| self.add_data_page(calculate_page_stats)?; |
| } |
| |
| while let Some(page) = self.data_pages.pop_front() { |
| self.write_data_page(page)?; |
| } |
| |
| Ok(()) |
| } |
| |
| /// Assembles and writes column chunk metadata. |
| fn write_column_metadata(&mut self) -> Result<ColumnChunkMetaData> { |
| let total_compressed_size = self.total_compressed_size as i64; |
| let total_uncompressed_size = self.total_uncompressed_size as i64; |
| let num_values = self.total_num_values as i64; |
| let dict_page_offset = self.dictionary_page_offset.map(|v| v as i64); |
| // If data page offset is not set, then no pages have been written |
| let data_page_offset = self.data_page_offset.unwrap_or(0) as i64; |
| |
| let file_offset; |
| let mut encodings = Vec::new(); |
| |
| if self.has_dictionary { |
| assert!(dict_page_offset.is_some(), "Dictionary offset is not set"); |
| file_offset = dict_page_offset.unwrap() + total_compressed_size; |
| // NOTE: This should be in sync with writing dictionary pages. |
| encodings.push(self.props.dictionary_page_encoding()); |
| encodings.push(self.props.dictionary_data_page_encoding()); |
| // Fallback to alternative encoding, add it to the list. |
| if self.dict_encoder.is_none() { |
| encodings.push(self.encoder.encoding()); |
| } |
| } else { |
| file_offset = data_page_offset + total_compressed_size; |
| encodings.push(self.encoder.encoding()); |
| } |
| // We use only RLE level encoding for data page v1 and data page v2. |
| encodings.push(Encoding::RLE); |
| |
| let statistics = self.make_column_statistics(); |
| let metadata = ColumnChunkMetaData::builder(self.descr.clone()) |
| .set_compression(self.codec) |
| .set_encodings(encodings) |
| .set_file_offset(file_offset) |
| .set_total_compressed_size(total_compressed_size) |
| .set_total_uncompressed_size(total_uncompressed_size) |
| .set_num_values(num_values) |
| .set_data_page_offset(data_page_offset) |
| .set_dictionary_page_offset(dict_page_offset) |
| .set_statistics(statistics) |
| .build()?; |
| |
| self.page_writer.write_metadata(&metadata)?; |
| |
| Ok(metadata) |
| } |
| |
| /// Encodes definition or repetition levels for Data Page v1. |
| #[inline] |
| fn encode_levels_v1( |
| &self, |
| encoding: Encoding, |
| levels: &[i16], |
| max_level: i16, |
| ) -> Result<Vec<u8>> { |
| let size = max_buffer_size(encoding, max_level, levels.len()); |
| let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]); |
| encoder.put(&levels)?; |
| encoder.consume() |
| } |
| |
| /// Encodes definition or repetition levels for Data Page v2. |
| /// Encoding is always RLE. |
| #[inline] |
| fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> { |
| let size = max_buffer_size(Encoding::RLE, max_level, levels.len()); |
| let mut encoder = LevelEncoder::v2(max_level, vec![0; size]); |
| encoder.put(&levels)?; |
| encoder.consume() |
| } |
| |
| /// Writes compressed data page into underlying sink and updates global metrics. |
| #[inline] |
| fn write_data_page(&mut self, page: CompressedPage) -> Result<()> { |
| let page_spec = self.page_writer.write_page(page)?; |
| self.update_metrics_for_page(page_spec); |
| Ok(()) |
| } |
| |
| /// Writes dictionary page into underlying sink. |
| #[inline] |
| fn write_dictionary_page(&mut self) -> Result<()> { |
| let compressed_page = { |
| let encoder = self |
| .dict_encoder |
| .as_ref() |
| .ok_or_else(|| general_err!("Dictionary encoder is not set"))?; |
| |
| let is_sorted = encoder.is_sorted(); |
| let num_values = encoder.num_entries(); |
| let mut values_buf = encoder.write_dict()?; |
| let uncompressed_size = values_buf.len(); |
| |
| if let Some(ref mut cmpr) = self.compressor { |
| let mut output_buf = Vec::with_capacity(uncompressed_size); |
| cmpr.compress(values_buf.data(), &mut output_buf)?; |
| values_buf = ByteBufferPtr::new(output_buf); |
| } |
| |
| let dict_page = Page::DictionaryPage { |
| buf: values_buf, |
| num_values: num_values as u32, |
| encoding: self.props.dictionary_page_encoding(), |
| is_sorted, |
| }; |
| CompressedPage::new(dict_page, uncompressed_size) |
| }; |
| |
| let page_spec = self.page_writer.write_page(compressed_page)?; |
| self.update_metrics_for_page(page_spec); |
| Ok(()) |
| } |
| |
| /// Updates column writer metrics with each page metadata. |
| #[inline] |
| fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) { |
| self.total_uncompressed_size += page_spec.uncompressed_size as u64; |
| self.total_compressed_size += page_spec.compressed_size as u64; |
| self.total_num_values += page_spec.num_values as u64; |
| self.total_bytes_written += page_spec.bytes_written; |
| |
| match page_spec.page_type { |
| PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => { |
| if self.data_page_offset.is_none() { |
| self.data_page_offset = Some(page_spec.offset); |
| } |
| } |
| PageType::DICTIONARY_PAGE => { |
| assert!( |
| self.dictionary_page_offset.is_none(), |
| "Dictionary offset is already set" |
| ); |
| self.dictionary_page_offset = Some(page_spec.offset); |
| } |
| _ => {} |
| } |
| } |
| |
| /// Returns reference to the underlying page writer. |
| /// This method is intended to use in tests only. |
| fn get_page_writer_ref(&self) -> &dyn PageWriter { |
| self.page_writer.as_ref() |
| } |
| |
| fn make_column_statistics(&self) -> Statistics { |
| self.make_typed_statistics(Level::Column) |
| } |
| |
| fn make_page_statistics(&self) -> Statistics { |
| self.make_typed_statistics(Level::Page) |
| } |
| |
| pub fn make_typed_statistics(&self, level: Level) -> Statistics { |
| let (min, max, distinct, nulls) = match level { |
| Level::Page => ( |
| self.min_page_value.as_ref(), |
| self.max_page_value.as_ref(), |
| self.page_distinct_count, |
| self.num_page_nulls, |
| ), |
| Level::Column => ( |
| self.min_column_value.as_ref(), |
| self.max_column_value.as_ref(), |
| self.column_distinct_count, |
| self.num_column_nulls, |
| ), |
| }; |
| match self.descr.physical_type() { |
| Type::INT32 => gen_stats_section!(i32, int32, min, max, distinct, nulls), |
| Type::BOOLEAN => gen_stats_section!(i32, int32, min, max, distinct, nulls), |
| Type::INT64 => gen_stats_section!(i64, int64, min, max, distinct, nulls), |
| Type::INT96 => gen_stats_section!(Int96, int96, min, max, distinct, nulls), |
| Type::FLOAT => gen_stats_section!(f32, float, min, max, distinct, nulls), |
| Type::DOUBLE => gen_stats_section!(f64, double, min, max, distinct, nulls), |
| Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { |
| let min = min.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); |
| let max = max.as_ref().map(|v| ByteArray::from(v.as_bytes().to_vec())); |
| Statistics::byte_array(min, max, distinct, nulls, false) |
| } |
| } |
| } |
| |
| #[allow(clippy::eq_op)] |
| fn update_page_min_max(&mut self, val: &T::T) { |
| // simple "isNaN" check that works for all types |
| if val == val { |
| if self |
| .min_page_value |
| .as_ref() |
| .map_or(true, |min| self.compare_greater(min, val)) |
| { |
| self.min_page_value = Some(val.clone()); |
| } |
| if self |
| .max_page_value |
| .as_ref() |
| .map_or(true, |max| self.compare_greater(val, max)) |
| { |
| self.max_page_value = Some(val.clone()); |
| } |
| } |
| } |
| |
| fn update_column_min_max(&mut self) { |
| let update_min = self.min_column_value.as_ref().map_or(true, |min| { |
| let page_value = self.min_page_value.as_ref().unwrap(); |
| self.compare_greater(min, page_value) |
| }); |
| if update_min { |
| self.min_column_value = self.min_page_value.clone(); |
| } |
| |
| let update_max = self.max_column_value.as_ref().map_or(true, |max| { |
| let page_value = self.max_page_value.as_ref().unwrap(); |
| self.compare_greater(page_value, max) |
| }); |
| if update_max { |
| self.max_column_value = self.max_page_value.clone(); |
| } |
| } |
| |
| /// Evaluate `a > b` according to underlying logical type. |
| fn compare_greater(&self, a: &T::T, b: &T::T) -> bool { |
| if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() { |
| if !int_type.is_signed { |
| // need to compare unsigned |
| return a.as_u64().unwrap() > b.as_u64().unwrap(); |
| } |
| } |
| a > b |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Encoding support for column writer. |
| // This mirrors parquet-mr default encodings for writes. See: |
| // https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java |
| // https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java |
| |
| /// Trait to define default encoding for types, including whether or not the type |
| /// supports dictionary encoding. |
| trait EncodingWriteSupport { |
| /// Returns true if dictionary is supported for column writer, false otherwise. |
| fn has_dictionary_support(props: &WriterProperties) -> bool; |
| } |
| |
| /// Returns encoding for a column when no other encoding is provided in writer properties. |
| fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding { |
| match (kind, props.writer_version()) { |
| (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE, |
| (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, |
| (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, |
| (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => { |
| Encoding::DELTA_BYTE_ARRAY |
| } |
| _ => Encoding::PLAIN, |
| } |
| } |
| |
| /// Returns true if dictionary is supported for column writer, false otherwise. |
| fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool { |
| match (kind, props.writer_version()) { |
| // Booleans do not support dict encoding and should use a fallback encoding. |
| (Type::BOOLEAN, _) => false, |
| // Dictionary encoding was not enabled in PARQUET 1.0 |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false, |
| (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true, |
| _ => true, |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use rand::distributions::uniform::SampleUniform; |
| |
| use crate::column::{ |
| page::PageReader, |
| reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl}, |
| }; |
| use crate::file::{ |
| properties::WriterProperties, reader::SerializedPageReader, |
| writer::SerializedPageWriter, |
| }; |
| use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; |
| use crate::util::{ |
| io::{FileSink, FileSource}, |
| test_common::{get_temp_file, random_numbers_range}, |
| }; |
| |
| use super::*; |
| |
| #[test] |
| fn test_column_writer_inconsistent_def_rep_length() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0])); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{}", err), |
| "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_invalid_def_levels() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], None, None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{}", err), |
| "Parquet error: Definition levels are required, because max definition level = 1" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_invalid_rep_levels() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props); |
| let res = writer.write_batch(&[1, 2, 3, 4], None, None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{}", err), |
| "Parquet error: Repetition levels are required, because max repetition level = 1" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_not_enough_values_to_write() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props); |
| let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{}", err), |
| "Parquet error: Expected to write 4 values, but have only 2" |
| ); |
| } |
| } |
| |
| #[test] |
| #[should_panic(expected = "Dictionary offset is already set")] |
| fn test_column_writer_write_only_one_dictionary_page() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| // First page should be correctly written. |
| let res = writer.write_dictionary_page(); |
| assert!(res.is_ok()); |
| writer.write_dictionary_page().unwrap(); |
| } |
| |
| #[test] |
| fn test_column_writer_error_when_writing_disabled_dictionary() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_dictionary_enabled(false) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| let res = writer.write_dictionary_page(); |
| assert!(res.is_err()); |
| if let Err(err) = res { |
| assert_eq!( |
| format!("{}", err), |
| "Parquet error: Dictionary encoder is not set" |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_boolean_type_does_not_support_dictionary() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_dictionary_enabled(true) |
| .build(), |
| ); |
| let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props); |
| writer |
| .write_batch(&[true, false, true, false], None, None) |
| .unwrap(); |
| |
| let (bytes_written, rows_written, metadata) = writer.close().unwrap(); |
| // PlainEncoder uses bit writer to write boolean values, which all fit into 1 |
| // byte. |
| assert_eq!(bytes_written, 1); |
| assert_eq!(rows_written, 4); |
| assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]); |
| assert_eq!(metadata.num_values(), 4); // just values |
| assert_eq!(metadata.dictionary_page_offset(), None); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_bool() { |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[true, false], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[true, false], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[true, false], |
| None, |
| &[Encoding::RLE, Encoding::RLE], |
| ); |
| check_encoding_write_support::<BoolType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[true, false], |
| None, |
| &[Encoding::RLE, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int32() { |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int32Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int64() { |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1, 2], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int64Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1, 2], |
| None, |
| &[Encoding::DELTA_BINARY_PACKED, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_int96() { |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[Int96::from(vec![1, 2, 3])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[Int96::from(vec![1, 2, 3])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[Int96::from(vec![1, 2, 3])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<Int96Type>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[Int96::from(vec![1, 2, 3])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_float() { |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FloatType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_double() { |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[1.0, 2.0], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<DoubleType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[1.0, 2.0], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_byte_array() { |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[ByteArray::from(vec![1u8])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[ByteArray::from(vec![1u8])], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[ByteArray::from(vec![1u8])], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<ByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[ByteArray::from(vec![1u8])], |
| None, |
| &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_default_encoding_support_fixed_len_byte_array() { |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| true, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_1_0, |
| false, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::PLAIN, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| true, |
| &[ByteArray::from(vec![1u8]).into()], |
| Some(0), |
| &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], |
| ); |
| check_encoding_write_support::<FixedLenByteArrayType>( |
| WriterVersion::PARQUET_2_0, |
| false, |
| &[ByteArray::from(vec![1u8]).into()], |
| None, |
| &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE], |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_check_metadata() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); |
| |
| let (bytes_written, rows_written, metadata) = writer.close().unwrap(); |
| assert_eq!(bytes_written, 20); |
| assert_eq!(rows_written, 4); |
| assert_eq!( |
| metadata.encodings(), |
| &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE] |
| ); |
| assert_eq!(metadata.num_values(), 8); // dictionary + value indexes |
| assert_eq!(metadata.compressed_size(), 20); |
| assert_eq!(metadata.uncompressed_size(), 20); |
| assert_eq!(metadata.data_page_offset(), 0); |
| assert_eq!(metadata.dictionary_page_offset(), Some(0)); |
| if let Some(stats) = metadata.statistics() { |
| assert!(stats.has_min_max_set()); |
| assert_eq!(stats.null_count(), 0); |
| assert_eq!(stats.distinct_count(), None); |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min(), &1); |
| assert_eq!(stats.max(), &4); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_precalculated_statistics() { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer |
| .write_batch_with_statistics( |
| &[1, 2, 3, 4], |
| None, |
| None, |
| &Some(-17), |
| &Some(9000), |
| Some(21), |
| Some(55), |
| ) |
| .unwrap(); |
| |
| let (bytes_written, rows_written, metadata) = writer.close().unwrap(); |
| assert_eq!(bytes_written, 20); |
| assert_eq!(rows_written, 4); |
| assert_eq!( |
| metadata.encodings(), |
| &vec![Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE] |
| ); |
| assert_eq!(metadata.num_values(), 8); // dictionary + value indexes |
| assert_eq!(metadata.compressed_size(), 20); |
| assert_eq!(metadata.uncompressed_size(), 20); |
| assert_eq!(metadata.data_page_offset(), 0); |
| assert_eq!(metadata.dictionary_page_offset(), Some(0)); |
| if let Some(stats) = metadata.statistics() { |
| assert!(stats.has_min_max_set()); |
| assert_eq!(stats.null_count(), 21); |
| assert_eq!(stats.distinct_count().unwrap_or(0), 55); |
| if let Statistics::Int32(stats) = stats { |
| assert_eq!(stats.min(), &-17); |
| assert_eq!(stats.max(), &9000); |
| } else { |
| panic!("expecting Statistics::Int32"); |
| } |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_empty_column_roundtrip() { |
| let props = WriterProperties::builder().build(); |
| column_roundtrip::<Int32Type>("test_col_writer_rnd_1", props, &[], None, None); |
| } |
| |
| #[test] |
| fn test_column_writer_non_nullable_values_roundtrip() { |
| let props = WriterProperties::builder().build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_2", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 0, |
| 0, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_nullable_non_repeated_values_roundtrip() { |
| let props = WriterProperties::builder().build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_column_writer_nullable_non_repeated_values_roundtrip", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 0, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_nullable_repeated_values_roundtrip() { |
| let props = WriterProperties::builder().build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_3", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_fallback_small_data_page() { |
| let props = WriterProperties::builder() |
| .set_dictionary_pagesize_limit(32) |
| .set_data_pagesize_limit(32) |
| .build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_4", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_small_write_batch_size() { |
| for i in &[1usize, 2, 5, 10, 11, 1023] { |
| let props = WriterProperties::builder().set_write_batch_size(*i).build(); |
| |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_5", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_disabled_v1() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_1_0) |
| .set_dictionary_enabled(false) |
| .build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_6", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_dictionary_disabled_v2() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| .set_dictionary_enabled(false) |
| .build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_7", |
| props, |
| 1024, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_compression_v1() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_1_0) |
| .set_compression(Compression::SNAPPY) |
| .build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_8", |
| props, |
| 2048, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_compression_v2() { |
| let props = WriterProperties::builder() |
| .set_writer_version(WriterVersion::PARQUET_2_0) |
| .set_compression(Compression::SNAPPY) |
| .build(); |
| column_roundtrip_random::<Int32Type>( |
| "test_col_writer_rnd_9", |
| props, |
| 2048, |
| std::i32::MIN, |
| std::i32::MAX, |
| 10, |
| 10, |
| ); |
| } |
| |
| #[test] |
| fn test_column_writer_add_data_pages_with_dict() { |
| // ARROW-5129: Test verifies that we add data page in case of dictionary encoding |
| // and no fallback occurred so far. |
| let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]); |
| let sink = FileSink::new(&file); |
| let page_writer = Box::new(SerializedPageWriter::new(sink)); |
| let props = Arc::new( |
| WriterProperties::builder() |
| .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes |
| .set_write_batch_size(3) // write 3 values at a time |
| .build(), |
| ); |
| let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; |
| let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); |
| writer.write_batch(data, None, None).unwrap(); |
| let (bytes_written, _, _) = writer.close().unwrap(); |
| |
| // Read pages and check the sequence |
| let source = FileSource::new(&file, 0, bytes_written as usize); |
| let mut page_reader = Box::new( |
| SerializedPageReader::new( |
| source, |
| data.len() as i64, |
| Compression::UNCOMPRESSED, |
| Int32Type::get_physical_type(), |
| ) |
| .unwrap(), |
| ); |
| let mut res = Vec::new(); |
| while let Some(page) = page_reader.get_next_page().unwrap() { |
| res.push((page.page_type(), page.num_values())); |
| } |
| assert_eq!( |
| res, |
| vec![ |
| (PageType::DICTIONARY_PAGE, 10), |
| (PageType::DATA_PAGE, 3), |
| (PageType::DATA_PAGE, 3), |
| (PageType::DATA_PAGE, 3), |
| (PageType::DATA_PAGE, 1) |
| ] |
| ); |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_middle() { |
| let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]); |
| assert!(stats.has_min_max_set()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min(), &1.0); |
| assert_eq!(stats.max(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_start() { |
| let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]); |
| assert!(stats.has_min_max_set()); |
| if let Statistics::Float(stats) = stats { |
| assert_eq!(stats.min(), &1.0); |
| assert_eq!(stats.max(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_float_statistics_nan_only() { |
| let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]); |
| assert!(!stats.has_min_max_set()); |
| assert!(matches!(stats, Statistics::Float(_))); |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_middle() { |
| let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]); |
| assert!(stats.has_min_max_set()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min(), &1.0); |
| assert_eq!(stats.max(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_start() { |
| let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]); |
| assert!(stats.has_min_max_set()); |
| if let Statistics::Double(stats) = stats { |
| assert_eq!(stats.min(), &1.0); |
| assert_eq!(stats.max(), &2.0); |
| } else { |
| panic!("expecting Statistics::Float"); |
| } |
| } |
| |
| #[test] |
| fn test_double_statistics_nan_only() { |
| let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]); |
| assert!(!stats.has_min_max_set()); |
| assert!(matches!(stats, Statistics::Double(_))); |
| } |
| |
| /// Performs write-read roundtrip with randomly generated values and levels. |
| /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write |
| /// for a column. |
| fn column_roundtrip_random<T: DataType>( |
| file_name: &str, |
| props: WriterProperties, |
| max_size: usize, |
| min_value: T::T, |
| max_value: T::T, |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) where |
| T::T: PartialOrd + SampleUniform + Copy, |
| { |
| let mut num_values: usize = 0; |
| |
| let mut buf: Vec<i16> = Vec::new(); |
| let def_levels = if max_def_level > 0 { |
| random_numbers_range(max_size, 0, max_def_level + 1, &mut buf); |
| for &dl in &buf[..] { |
| if dl == max_def_level { |
| num_values += 1; |
| } |
| } |
| Some(&buf[..]) |
| } else { |
| num_values = max_size; |
| None |
| }; |
| |
| let mut buf: Vec<i16> = Vec::new(); |
| let rep_levels = if max_rep_level > 0 { |
| random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf); |
| Some(&buf[..]) |
| } else { |
| None |
| }; |
| |
| let mut values: Vec<T::T> = Vec::new(); |
| random_numbers_range(num_values, min_value, max_value, &mut values); |
| |
| column_roundtrip::<T>(file_name, props, &values[..], def_levels, rep_levels); |
| } |
| |
| /// Performs write-read roundtrip and asserts written values and levels. |
| fn column_roundtrip<'a, T: DataType>( |
| file_name: &'a str, |
| props: WriterProperties, |
| values: &[T::T], |
| def_levels: Option<&[i16]>, |
| rep_levels: Option<&[i16]>, |
| ) { |
| let file = get_temp_file(file_name, &[]); |
| let sink = FileSink::new(&file); |
| let page_writer = Box::new(SerializedPageWriter::new(sink)); |
| |
| let max_def_level = match def_levels { |
| Some(buf) => *buf.iter().max().unwrap_or(&0i16), |
| None => 0i16, |
| }; |
| |
| let max_rep_level = match rep_levels { |
| Some(buf) => *buf.iter().max().unwrap_or(&0i16), |
| None => 0i16, |
| }; |
| |
| let mut max_batch_size = values.len(); |
| if let Some(levels) = def_levels { |
| max_batch_size = cmp::max(max_batch_size, levels.len()); |
| } |
| if let Some(levels) = rep_levels { |
| max_batch_size = cmp::max(max_batch_size, levels.len()); |
| } |
| |
| let mut writer = get_test_column_writer::<T>( |
| page_writer, |
| max_def_level, |
| max_rep_level, |
| Arc::new(props), |
| ); |
| |
| let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap(); |
| assert_eq!(values_written, values.len()); |
| let (bytes_written, rows_written, column_metadata) = writer.close().unwrap(); |
| |
| let source = FileSource::new(&file, 0, bytes_written as usize); |
| let page_reader = Box::new( |
| SerializedPageReader::new( |
| source, |
| column_metadata.num_values(), |
| column_metadata.compression(), |
| T::get_physical_type(), |
| ) |
| .unwrap(), |
| ); |
| let reader = |
| get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level); |
| |
| let mut actual_values = vec![T::T::default(); max_batch_size]; |
| let mut actual_def_levels = def_levels.map(|_| vec![0i16; max_batch_size]); |
| let mut actual_rep_levels = rep_levels.map(|_| vec![0i16; max_batch_size]); |
| |
| let (values_read, levels_read) = read_fully( |
| reader, |
| max_batch_size, |
| actual_def_levels.as_mut(), |
| actual_rep_levels.as_mut(), |
| actual_values.as_mut_slice(), |
| ); |
| |
| // Assert values, definition and repetition levels. |
| |
| assert_eq!(&actual_values[..values_read], values); |
| match actual_def_levels { |
| Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels), |
| None => assert_eq!(None, def_levels), |
| } |
| match actual_rep_levels { |
| Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels), |
| None => assert_eq!(None, rep_levels), |
| } |
| |
| // Assert written rows. |
| |
| if let Some(levels) = actual_rep_levels { |
| let mut actual_rows_written = 0; |
| for l in levels { |
| if l == 0 { |
| actual_rows_written += 1; |
| } |
| } |
| assert_eq!(actual_rows_written, rows_written); |
| } else if actual_def_levels.is_some() { |
| assert_eq!(levels_read as u64, rows_written); |
| } else { |
| assert_eq!(values_read as u64, rows_written); |
| } |
| } |
| |
| /// Performs write of provided values and returns column metadata of those values. |
| /// Used to test encoding support for column writer. |
| fn column_write_and_get_metadata<T: DataType>( |
| props: WriterProperties, |
| values: &[T::T], |
| ) -> ColumnChunkMetaData { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(props); |
| let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props); |
| writer.write_batch(values, None, None).unwrap(); |
| let (_, _, metadata) = writer.close().unwrap(); |
| metadata |
| } |
| |
| // Function to use in tests for EncodingWriteSupport. This checks that dictionary |
| // offset and encodings to make sure that column writer uses provided by trait |
| // encodings. |
| fn check_encoding_write_support<T: DataType>( |
| version: WriterVersion, |
| dict_enabled: bool, |
| data: &[T::T], |
| dictionary_page_offset: Option<i64>, |
| encodings: &[Encoding], |
| ) { |
| let props = WriterProperties::builder() |
| .set_writer_version(version) |
| .set_dictionary_enabled(dict_enabled) |
| .build(); |
| let meta = column_write_and_get_metadata::<T>(props, data); |
| assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset); |
| assert_eq!(meta.encodings(), &encodings); |
| } |
| |
| /// Reads one batch of data, considering that batch is large enough to capture all of |
| /// the values and levels. |
| fn read_fully<T: DataType>( |
| mut reader: ColumnReaderImpl<T>, |
| batch_size: usize, |
| mut def_levels: Option<&mut Vec<i16>>, |
| mut rep_levels: Option<&mut Vec<i16>>, |
| values: &mut [T::T], |
| ) -> (usize, usize) { |
| let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]); |
| let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]); |
| reader |
| .read_batch(batch_size, actual_def_levels, actual_rep_levels, values) |
| .unwrap() |
| } |
| |
| /// Returns column writer. |
| fn get_test_column_writer<T: DataType>( |
| page_writer: Box<dyn PageWriter>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| props: WriterPropertiesPtr, |
| ) -> ColumnWriterImpl<T> { |
| let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level)); |
| let column_writer = get_column_writer(descr, props, page_writer); |
| get_typed_column_writer::<T>(column_writer) |
| } |
| |
| /// Returns column reader. |
| fn get_test_column_reader<T: DataType>( |
| page_reader: Box<dyn PageReader>, |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnReaderImpl<T> { |
| let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level)); |
| let column_reader = get_column_reader(descr, page_reader); |
| get_typed_column_reader::<T>(column_reader) |
| } |
| |
| /// Returns descriptor for primitive column. |
| fn get_test_column_descr<T: DataType>( |
| max_def_level: i16, |
| max_rep_level: i16, |
| ) -> ColumnDescriptor { |
| let path = ColumnPath::from("col"); |
| let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type()) |
| // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type, |
| // it should be no-op for other types |
| .with_length(1) |
| .build() |
| .unwrap(); |
| ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) |
| } |
| |
| /// Returns page writer that collects pages without serializing them. |
| fn get_test_page_writer() -> Box<dyn PageWriter> { |
| Box::new(TestPageWriter {}) |
| } |
| |
| struct TestPageWriter {} |
| |
| impl PageWriter for TestPageWriter { |
| fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> { |
| let mut res = PageWriteSpec::new(); |
| res.page_type = page.page_type(); |
| res.uncompressed_size = page.uncompressed_size(); |
| res.compressed_size = page.compressed_size(); |
| res.num_values = page.num_values(); |
| res.offset = 0; |
| res.bytes_written = page.data().len() as u64; |
| Ok(res) |
| } |
| |
| fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> { |
| Ok(()) |
| } |
| |
| fn close(&mut self) -> Result<()> { |
| Ok(()) |
| } |
| } |
| |
| /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics. |
| fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics { |
| let page_writer = get_test_page_writer(); |
| let props = Arc::new(WriterProperties::builder().build()); |
| let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props); |
| writer.write_batch(values, None, None).unwrap(); |
| |
| let (_bytes_written, _rows_written, metadata) = writer.close().unwrap(); |
| if let Some(stats) = metadata.statistics() { |
| stats.clone() |
| } else { |
| panic!("metadata missing statistics"); |
| } |
| } |
| } |