| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains all supported encoders for Parquet. |
| |
| use std::{cmp, io::Write, marker::PhantomData}; |
| |
| use crate::basic::*; |
| use crate::data_type::private::ParquetValueType; |
| use crate::data_type::*; |
| use crate::encodings::rle::RleEncoder; |
| use crate::errors::{ParquetError, Result}; |
| use crate::schema::types::ColumnDescPtr; |
| use crate::util::{ |
| bit_util::{self, log2, num_required_bits, BitWriter}, |
| hash_util, |
| memory::{Buffer, ByteBuffer, ByteBufferPtr, MemTrackerPtr}, |
| }; |
| |
| // ---------------------------------------------------------------------- |
| // Encoders |
| |
| /// An Parquet encoder for the data type `T`. |
| /// |
| /// Currently this allocates internal buffers for the encoded values. After done putting |
| /// values, caller should call `flush_buffer()` to get an immutable buffer pointer. |
| pub trait Encoder<T: DataType> { |
| /// Encodes data from `values`. |
| fn put(&mut self, values: &[T::T]) -> Result<()>; |
| |
| /// Encodes data from `values`, which contains spaces for null values, that is |
| /// identified by `valid_bits`. |
| /// |
| /// Returns the number of non-null values encoded. |
| fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> { |
| let num_values = values.len(); |
| let mut buffer = Vec::with_capacity(num_values); |
| // TODO: this is pretty inefficient. Revisit in future. |
| for i in 0..num_values { |
| if bit_util::get_bit(valid_bits, i) { |
| buffer.push(values[i].clone()); |
| } |
| } |
| self.put(&buffer[..])?; |
| Ok(buffer.len()) |
| } |
| |
| /// Returns the encoding type of this encoder. |
| fn encoding(&self) -> Encoding; |
| |
| /// Returns an estimate of the encoded data, in bytes. |
| /// Method call must be O(1). |
| fn estimated_data_encoded_size(&self) -> usize; |
| |
| /// Flushes the underlying byte buffer that's being processed by this encoder, and |
| /// return the immutable copy of it. This will also reset the internal state. |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr>; |
| } |
| |
| /// Gets a encoder for the particular data type `T` and encoding `encoding`. Memory usage |
| /// for the encoder instance is tracked by `mem_tracker`. |
| pub fn get_encoder<T: DataType>( |
| desc: ColumnDescPtr, |
| encoding: Encoding, |
| mem_tracker: MemTrackerPtr, |
| ) -> Result<Box<dyn Encoder<T>>> { |
| let encoder: Box<dyn Encoder<T>> = match encoding { |
| Encoding::PLAIN => Box::new(PlainEncoder::new(desc, mem_tracker, vec![])), |
| Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { |
| return Err(general_err!( |
| "Cannot initialize this encoding through this function" |
| )); |
| } |
| Encoding::RLE => Box::new(RleValueEncoder::new()), |
| Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()), |
| Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()), |
| Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()), |
| e => return Err(nyi_err!("Encoding {} is not supported", e)), |
| }; |
| Ok(encoder) |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Plain encoding |
| |
| /// Plain encoding that supports all types. |
| /// Values are encoded back to back. |
| /// The plain encoding is used whenever a more efficient encoding can not be used. |
| /// It stores the data in the following format: |
| /// - BOOLEAN - 1 bit per value, 0 is false; 1 is true. |
| /// - INT32 - 4 bytes per value, stored as little-endian. |
| /// - INT64 - 8 bytes per value, stored as little-endian. |
| /// - FLOAT - 4 bytes per value, stored as IEEE little-endian. |
| /// - DOUBLE - 8 bytes per value, stored as IEEE little-endian. |
| /// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes. |
| /// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored. |
| pub struct PlainEncoder<T: DataType> { |
| buffer: ByteBuffer, |
| bit_writer: BitWriter, |
| desc: ColumnDescPtr, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> PlainEncoder<T> { |
| /// Creates new plain encoder. |
| pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr, vec: Vec<u8>) -> Self { |
| let mut byte_buffer = ByteBuffer::new().with_mem_tracker(mem_tracker); |
| byte_buffer.set_data(vec); |
| Self { |
| buffer: byte_buffer, |
| bit_writer: BitWriter::new(256), |
| desc, |
| _phantom: PhantomData, |
| } |
| } |
| } |
| |
| impl<T: DataType> Encoder<T> for PlainEncoder<T> { |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::PLAIN |
| } |
| |
| fn estimated_data_encoded_size(&self) -> usize { |
| self.buffer.size() + self.bit_writer.bytes_written() |
| } |
| |
| #[inline] |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| self.buffer.write_all(self.bit_writer.flush_buffer())?; |
| self.buffer.flush()?; |
| self.bit_writer.clear(); |
| |
| Ok(self.buffer.consume()) |
| } |
| |
| #[inline] |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?; |
| Ok(()) |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Dictionary encoding |
| |
| const INITIAL_HASH_TABLE_SIZE: usize = 1024; |
| const MAX_HASH_LOAD: f32 = 0.7; |
| const HASH_SLOT_EMPTY: i32 = -1; |
| |
| /// Dictionary encoder. |
| /// The dictionary encoding builds a dictionary of values encountered in a given column. |
| /// The dictionary page is written first, before the data pages of the column chunk. |
| /// |
| /// Dictionary page format: the entries in the dictionary - in dictionary order - |
| /// using the plain encoding. |
| /// |
| /// Data page format: the bit width used to encode the entry ids stored as 1 byte |
| /// (max bit width = 32), followed by the values encoded using RLE/Bit packed described |
| /// above (with the given bit width). |
| pub struct DictEncoder<T: DataType> { |
| // Descriptor for the column to be encoded. |
| desc: ColumnDescPtr, |
| |
| // Size of the table. **Must be** a power of 2. |
| hash_table_size: usize, |
| |
| // Store `hash_table_size` - 1, so that `j & mod_bitmask` is equivalent to |
| // `j % hash_table_size`, but uses far fewer CPU cycles. |
| mod_bitmask: u32, |
| |
| // Stores indices which map (many-to-one) to the values in the `uniques` array. |
| // Here we are using fix-sized array with linear probing. |
| // A slot with `HASH_SLOT_EMPTY` indicates the slot is not currently occupied. |
| hash_slots: Buffer<i32>, |
| |
| // Indices that have not yet be written out by `write_indices()`. |
| buffered_indices: Buffer<i32>, |
| |
| // The unique observed values. |
| uniques: Buffer<T::T>, |
| |
| // Size in bytes needed to encode this dictionary. |
| uniques_size_in_bytes: usize, |
| |
| // Tracking memory usage for the various data structures in this struct. |
| mem_tracker: MemTrackerPtr, |
| } |
| |
| impl<T: DataType> DictEncoder<T> { |
| /// Creates new dictionary encoder. |
| pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr) -> Self { |
| let mut slots = Buffer::new().with_mem_tracker(mem_tracker.clone()); |
| slots.resize(INITIAL_HASH_TABLE_SIZE, -1); |
| Self { |
| desc, |
| hash_table_size: INITIAL_HASH_TABLE_SIZE, |
| mod_bitmask: (INITIAL_HASH_TABLE_SIZE - 1) as u32, |
| hash_slots: slots, |
| buffered_indices: Buffer::new().with_mem_tracker(mem_tracker.clone()), |
| uniques: Buffer::new().with_mem_tracker(mem_tracker.clone()), |
| uniques_size_in_bytes: 0, |
| mem_tracker, |
| } |
| } |
| |
| /// Returns true if dictionary entries are sorted, false otherwise. |
| #[inline] |
| pub fn is_sorted(&self) -> bool { |
| // Sorting is not supported currently. |
| false |
| } |
| |
| /// Returns number of unique values (keys) in the dictionary. |
| pub fn num_entries(&self) -> usize { |
| self.uniques.size() |
| } |
| |
| /// Returns size of unique values (keys) in the dictionary, in bytes. |
| pub fn dict_encoded_size(&self) -> usize { |
| self.uniques_size_in_bytes |
| } |
| |
| /// Writes out the dictionary values with PLAIN encoding in a byte buffer, and return |
| /// the result. |
| #[inline] |
| pub fn write_dict(&self) -> Result<ByteBufferPtr> { |
| let mut plain_encoder = |
| PlainEncoder::<T>::new(self.desc.clone(), self.mem_tracker.clone(), vec![]); |
| plain_encoder.put(self.uniques.data())?; |
| plain_encoder.flush_buffer() |
| } |
| |
| /// Writes out the dictionary values with RLE encoding in a byte buffer, and return |
| /// the result. |
| pub fn write_indices(&mut self) -> Result<ByteBufferPtr> { |
| // TODO: the caller should allocate the buffer |
| let buffer_len = self.estimated_data_encoded_size(); |
| let mut buffer: Vec<u8> = vec![0; buffer_len as usize]; |
| buffer[0] = self.bit_width() as u8; |
| self.mem_tracker.alloc(buffer.capacity() as i64); |
| |
| // Write bit width in the first byte |
| buffer.write_all((self.bit_width() as u8).as_bytes())?; |
| let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1); |
| for index in self.buffered_indices.data() { |
| if !encoder.put(*index as u64)? { |
| return Err(general_err!("Encoder doesn't have enough space")); |
| } |
| } |
| self.buffered_indices.clear(); |
| Ok(ByteBufferPtr::new(encoder.consume()?)) |
| } |
| |
| #[inline] |
| #[allow(clippy::unnecessary_wraps)] |
| fn put_one(&mut self, value: &T::T) -> Result<()> { |
| let mut j = (hash_util::hash(value, 0) & self.mod_bitmask) as usize; |
| let mut index = self.hash_slots[j]; |
| |
| while index != HASH_SLOT_EMPTY && self.uniques[index as usize] != *value { |
| j += 1; |
| if j == self.hash_table_size { |
| j = 0; |
| } |
| index = self.hash_slots[j]; |
| } |
| |
| if index == HASH_SLOT_EMPTY { |
| index = self.insert_fresh_slot(j, value.clone()); |
| } |
| |
| self.buffered_indices.push(index); |
| Ok(()) |
| } |
| |
| #[inline(never)] |
| fn insert_fresh_slot(&mut self, slot: usize, value: T::T) -> i32 { |
| let index = self.uniques.size() as i32; |
| self.hash_slots[slot] = index; |
| |
| let (base_size, num_elements) = value.dict_encoding_size(); |
| |
| let unique_size = match T::get_physical_type() { |
| Type::BYTE_ARRAY => base_size + num_elements, |
| Type::FIXED_LEN_BYTE_ARRAY => self.desc.type_length() as usize, |
| _ => base_size, |
| }; |
| |
| self.uniques_size_in_bytes += unique_size; |
| self.uniques.push(value); |
| |
| if self.uniques.size() > (self.hash_table_size as f32 * MAX_HASH_LOAD) as usize { |
| self.double_table_size(); |
| } |
| |
| index |
| } |
| |
| #[inline] |
| fn bit_width(&self) -> u8 { |
| let num_entries = self.uniques.size(); |
| if num_entries == 0 { |
| 0 |
| } else if num_entries == 1 { |
| 1 |
| } else { |
| log2(num_entries as u64) as u8 |
| } |
| } |
| |
| fn double_table_size(&mut self) { |
| let new_size = self.hash_table_size * 2; |
| let mut new_hash_slots = Buffer::new().with_mem_tracker(self.mem_tracker.clone()); |
| new_hash_slots.resize(new_size, HASH_SLOT_EMPTY); |
| for i in 0..self.hash_table_size { |
| let index = self.hash_slots[i]; |
| if index == HASH_SLOT_EMPTY { |
| continue; |
| } |
| let value = &self.uniques[index as usize]; |
| let mut j = (hash_util::hash(value, 0) & ((new_size - 1) as u32)) as usize; |
| let mut slot = new_hash_slots[j]; |
| while slot != HASH_SLOT_EMPTY && self.uniques[slot as usize] != *value { |
| j += 1; |
| if j == new_size { |
| j = 0; |
| } |
| slot = new_hash_slots[j]; |
| } |
| |
| new_hash_slots[j] = index; |
| } |
| |
| self.hash_table_size = new_size; |
| self.mod_bitmask = (new_size - 1) as u32; |
| self.hash_slots = new_hash_slots; |
| } |
| } |
| |
| impl<T: DataType> Encoder<T> for DictEncoder<T> { |
| #[inline] |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| for i in values { |
| self.put_one(&i)? |
| } |
| Ok(()) |
| } |
| |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::PLAIN_DICTIONARY |
| } |
| |
| #[inline] |
| fn estimated_data_encoded_size(&self) -> usize { |
| let bit_width = self.bit_width(); |
| 1 + RleEncoder::min_buffer_size(bit_width) |
| + RleEncoder::max_buffer_size(bit_width, self.buffered_indices.size()) |
| } |
| |
| #[inline] |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| self.write_indices() |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // RLE encoding |
| |
| const DEFAULT_RLE_BUFFER_LEN: usize = 1024; |
| |
| /// RLE/Bit-Packing hybrid encoding for values. |
| /// Currently is used only for data pages v2 and supports boolean types. |
| pub struct RleValueEncoder<T: DataType> { |
| // Buffer with raw values that we collect, |
| // when flushing buffer they are encoded using RLE encoder |
| encoder: Option<RleEncoder>, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> RleValueEncoder<T> { |
| /// Creates new rle value encoder. |
| pub fn new() -> Self { |
| Self { |
| encoder: None, |
| _phantom: PhantomData, |
| } |
| } |
| } |
| |
| impl<T: DataType> Encoder<T> for RleValueEncoder<T> { |
| #[inline] |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); |
| |
| if self.encoder.is_none() { |
| self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN)); |
| } |
| let rle_encoder = self.encoder.as_mut().unwrap(); |
| for value in values { |
| let value = value.as_u64()?; |
| if !rle_encoder.put(value)? { |
| return Err(general_err!("RLE buffer is full")); |
| } |
| } |
| Ok(()) |
| } |
| |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::RLE |
| } |
| |
| #[inline] |
| fn estimated_data_encoded_size(&self) -> usize { |
| match self.encoder { |
| Some(ref enc) => enc.len(), |
| None => 0, |
| } |
| } |
| |
| #[inline] |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); |
| let rle_encoder = self |
| .encoder |
| .as_mut() |
| .expect("RLE value encoder is not initialized"); |
| |
| // Flush all encoder buffers and raw values |
| let encoded_data = { |
| let buf = rle_encoder.flush_buffer()?; |
| |
| // Note that buf does not have any offset, all data is encoded bytes |
| let len = (buf.len() as i32).to_le(); |
| let len_bytes = len.as_bytes(); |
| let mut encoded_data = vec![]; |
| encoded_data.extend_from_slice(len_bytes); |
| encoded_data.extend_from_slice(buf); |
| encoded_data |
| }; |
| // Reset rle encoder for the next batch |
| rle_encoder.clear(); |
| |
| Ok(ByteBufferPtr::new(encoded_data)) |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // DELTA_BINARY_PACKED encoding |
| |
| const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32; |
| const MAX_BIT_WRITER_SIZE: usize = 10 * 1024 * 1024; |
| const DEFAULT_BLOCK_SIZE: usize = 128; |
| const DEFAULT_NUM_MINI_BLOCKS: usize = 4; |
| |
| /// Delta bit packed encoder. |
| /// Consists of a header followed by blocks of delta encoded values binary packed. |
| /// |
| /// Delta-binary-packing: |
| /// ```shell |
| /// [page-header] [block 1], [block 2], ... [block N] |
| /// ``` |
| /// |
| /// Each page header consists of: |
| /// ```shell |
| /// [block size] [number of miniblocks in a block] [total value count] [first value] |
| /// ``` |
| /// |
| /// Each block consists of: |
| /// ```shell |
| /// [min delta] [list of bitwidths of miniblocks] [miniblocks] |
| /// ``` |
| /// |
| /// Current implementation writes values in `put` method, multiple calls to `put` to |
| /// existing block or start new block if block size is exceeded. Calling `flush_buffer` |
| /// writes out all data and resets internal state, including page header. |
| /// |
| /// Supports only INT32 and INT64. |
| pub struct DeltaBitPackEncoder<T: DataType> { |
| page_header_writer: BitWriter, |
| bit_writer: BitWriter, |
| total_values: usize, |
| first_value: i64, |
| current_value: i64, |
| block_size: usize, |
| mini_block_size: usize, |
| num_mini_blocks: usize, |
| values_in_block: usize, |
| deltas: Vec<i64>, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> DeltaBitPackEncoder<T> { |
| /// Creates new delta bit packed encoder. |
| pub fn new() -> Self { |
| let block_size = DEFAULT_BLOCK_SIZE; |
| let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS; |
| let mini_block_size = block_size / num_mini_blocks; |
| assert!(mini_block_size % 8 == 0); |
| Self::assert_supported_type(); |
| |
| DeltaBitPackEncoder { |
| page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE), |
| bit_writer: BitWriter::new(MAX_BIT_WRITER_SIZE), |
| total_values: 0, |
| first_value: 0, |
| current_value: 0, // current value to keep adding deltas |
| block_size, // can write fewer values than block size for last block |
| mini_block_size, |
| num_mini_blocks, |
| values_in_block: 0, // will be at most block_size |
| deltas: vec![0; block_size], |
| _phantom: PhantomData, |
| } |
| } |
| |
| /// Writes page header for blocks, this method is invoked when we are done encoding |
| /// values. It is also okay to encode when no values have been provided |
| fn write_page_header(&mut self) { |
| // We ignore the result of each 'put' operation, because |
| // MAX_PAGE_HEADER_WRITER_SIZE is chosen to fit all header values and |
| // guarantees that writes will not fail. |
| |
| // Write the size of each block |
| self.page_header_writer.put_vlq_int(self.block_size as u64); |
| // Write the number of mini blocks |
| self.page_header_writer |
| .put_vlq_int(self.num_mini_blocks as u64); |
| // Write the number of all values (including non-encoded first value) |
| self.page_header_writer |
| .put_vlq_int(self.total_values as u64); |
| // Write first value |
| self.page_header_writer.put_zigzag_vlq_int(self.first_value); |
| } |
| |
| // Write current delta buffer (<= 'block size' values) into bit writer |
| #[inline(never)] |
| fn flush_block_values(&mut self) -> Result<()> { |
| if self.values_in_block == 0 { |
| return Ok(()); |
| } |
| |
| let mut min_delta = i64::max_value(); |
| for i in 0..self.values_in_block { |
| min_delta = cmp::min(min_delta, self.deltas[i]); |
| } |
| |
| // Write min delta |
| self.bit_writer.put_zigzag_vlq_int(min_delta); |
| |
| // Slice to store bit width for each mini block |
| let offset = self.bit_writer.skip(self.num_mini_blocks)?; |
| |
| for i in 0..self.num_mini_blocks { |
| // Find how many values we need to encode - either block size or whatever |
| // values left |
| let n = cmp::min(self.mini_block_size, self.values_in_block); |
| if n == 0 { |
| break; |
| } |
| |
| // Compute the max delta in current mini block |
| let mut max_delta = i64::min_value(); |
| for j in 0..n { |
| max_delta = |
| cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]); |
| } |
| |
| // Compute bit width to store (max_delta - min_delta) |
| let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)); |
| self.bit_writer.write_at(offset + i, bit_width as u8); |
| |
| // Encode values in current mini block using min_delta and bit_width |
| for j in 0..n { |
| let packed_value = self |
| .subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta); |
| self.bit_writer.put_value(packed_value, bit_width); |
| } |
| |
| // Pad the last block (n < mini_block_size) |
| for _ in n..self.mini_block_size { |
| self.bit_writer.put_value(0, bit_width); |
| } |
| |
| self.values_in_block -= n; |
| } |
| |
| assert!( |
| self.values_in_block == 0, |
| "Expected 0 values in block, found {}", |
| self.values_in_block |
| ); |
| Ok(()) |
| } |
| } |
| |
| // Implementation is shared between Int32Type and Int64Type, |
| // see `DeltaBitPackEncoderConversion` below for specifics. |
| impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> { |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| if values.is_empty() { |
| return Ok(()); |
| } |
| |
| // Define values to encode, initialize state |
| let mut idx = if self.total_values == 0 { |
| self.first_value = self.as_i64(values, 0); |
| self.current_value = self.first_value; |
| 1 |
| } else { |
| 0 |
| }; |
| // Add all values (including first value) |
| self.total_values += values.len(); |
| |
| // Write block |
| while idx < values.len() { |
| let value = self.as_i64(values, idx); |
| self.deltas[self.values_in_block] = self.subtract(value, self.current_value); |
| self.current_value = value; |
| idx += 1; |
| self.values_in_block += 1; |
| if self.values_in_block == self.block_size { |
| self.flush_block_values()?; |
| } |
| } |
| Ok(()) |
| } |
| |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::DELTA_BINARY_PACKED |
| } |
| |
| fn estimated_data_encoded_size(&self) -> usize { |
| self.bit_writer.bytes_written() |
| } |
| |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| // Write remaining values |
| self.flush_block_values()?; |
| // Write page header with total values |
| self.write_page_header(); |
| |
| let mut buffer = ByteBuffer::new(); |
| buffer.write_all(self.page_header_writer.flush_buffer())?; |
| buffer.write_all(self.bit_writer.flush_buffer())?; |
| buffer.flush()?; |
| |
| // Reset state |
| self.page_header_writer.clear(); |
| self.bit_writer.clear(); |
| self.total_values = 0; |
| self.first_value = 0; |
| self.current_value = 0; |
| self.values_in_block = 0; |
| |
| Ok(buffer.consume()) |
| } |
| } |
| |
| /// Helper trait to define specific conversions and subtractions when computing deltas |
| trait DeltaBitPackEncoderConversion<T: DataType> { |
| // Method should panic if type is not supported, otherwise no-op |
| fn assert_supported_type(); |
| |
| fn as_i64(&self, values: &[T::T], index: usize) -> i64; |
| |
| fn subtract(&self, left: i64, right: i64) -> i64; |
| |
| fn subtract_u64(&self, left: i64, right: i64) -> u64; |
| } |
| |
| impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> { |
| #[inline] |
| fn assert_supported_type() { |
| ensure_phys_ty!( |
| Type::INT32 | Type::INT64, |
| "DeltaBitPackDecoder only supports Int32Type and Int64Type" |
| ); |
| } |
| |
| #[inline] |
| fn as_i64(&self, values: &[T::T], index: usize) -> i64 { |
| values[index] |
| .as_i64() |
| .expect("DeltaBitPackDecoder only supports Int32Type and Int64Type") |
| } |
| |
| #[inline] |
| fn subtract(&self, left: i64, right: i64) -> i64 { |
| // It is okay for values to overflow, wrapping_sub wrapping around at the boundary |
| match T::get_physical_type() { |
| Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64, |
| Type::INT64 => left.wrapping_sub(right), |
| _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), |
| } |
| } |
| |
| #[inline] |
| fn subtract_u64(&self, left: i64, right: i64) -> u64 { |
| match T::get_physical_type() { |
| // Conversion of i32 -> u32 -> u64 is to avoid non-zero left most bytes in int repr |
| Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64, |
| Type::INT64 => left.wrapping_sub(right) as u64, |
| _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), |
| } |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // DELTA_LENGTH_BYTE_ARRAY encoding |
| |
| /// Encoding for byte arrays to separate the length values and the data. |
| /// The lengths are encoded using DELTA_BINARY_PACKED encoding, data is |
| /// stored as raw bytes. |
| pub struct DeltaLengthByteArrayEncoder<T: DataType> { |
| // length encoder |
| len_encoder: DeltaBitPackEncoder<Int32Type>, |
| // byte array data |
| data: Vec<ByteArray>, |
| // data size in bytes of encoded values |
| encoded_size: usize, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> DeltaLengthByteArrayEncoder<T> { |
| /// Creates new delta length byte array encoder. |
| pub fn new() -> Self { |
| Self { |
| len_encoder: DeltaBitPackEncoder::new(), |
| data: vec![], |
| encoded_size: 0, |
| _phantom: PhantomData, |
| } |
| } |
| } |
| |
| impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> { |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| ensure_phys_ty!( |
| Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY, |
| "DeltaLengthByteArrayEncoder only supports ByteArrayType" |
| ); |
| |
| let val_it = || { |
| values |
| .iter() |
| .map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap()) |
| }; |
| |
| let lengths: Vec<i32> = |
| val_it().map(|byte_array| byte_array.len() as i32).collect(); |
| self.len_encoder.put(&lengths)?; |
| for byte_array in val_it() { |
| self.encoded_size += byte_array.len(); |
| self.data.push(byte_array.clone()); |
| } |
| |
| Ok(()) |
| } |
| |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::DELTA_LENGTH_BYTE_ARRAY |
| } |
| |
| fn estimated_data_encoded_size(&self) -> usize { |
| self.len_encoder.estimated_data_encoded_size() + self.encoded_size |
| } |
| |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| ensure_phys_ty!( |
| Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY, |
| "DeltaLengthByteArrayEncoder only supports ByteArrayType" |
| ); |
| |
| let mut total_bytes = vec![]; |
| let lengths = self.len_encoder.flush_buffer()?; |
| total_bytes.extend_from_slice(lengths.data()); |
| self.data.iter().for_each(|byte_array| { |
| total_bytes.extend_from_slice(byte_array.data()); |
| }); |
| self.data.clear(); |
| self.encoded_size = 0; |
| |
| Ok(ByteBufferPtr::new(total_bytes)) |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // DELTA_BYTE_ARRAY encoding |
| |
| /// Encoding for byte arrays, prefix lengths are encoded using DELTA_BINARY_PACKED |
| /// encoding, followed by suffixes with DELTA_LENGTH_BYTE_ARRAY encoding. |
| pub struct DeltaByteArrayEncoder<T: DataType> { |
| prefix_len_encoder: DeltaBitPackEncoder<Int32Type>, |
| suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>, |
| previous: Vec<u8>, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: DataType> DeltaByteArrayEncoder<T> { |
| /// Creates new delta byte array encoder. |
| pub fn new() -> Self { |
| Self { |
| prefix_len_encoder: DeltaBitPackEncoder::new(), |
| suffix_writer: DeltaLengthByteArrayEncoder::new(), |
| previous: vec![], |
| _phantom: PhantomData, |
| } |
| } |
| } |
| |
| impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> { |
| fn put(&mut self, values: &[T::T]) -> Result<()> { |
| let mut prefix_lengths: Vec<i32> = vec![]; |
| let mut suffixes: Vec<ByteArray> = vec![]; |
| |
| let values = values.iter() |
| .map(|x| x.as_any()) |
| .map(|x| match T::get_physical_type() { |
| Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(), |
| Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(), |
| _ => panic!( |
| "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" |
| ) |
| }); |
| |
| for byte_array in values { |
| let current = byte_array.data(); |
| // Maximum prefix length that is shared between previous value and current |
| // value |
| let prefix_len = cmp::min(self.previous.len(), current.len()); |
| let mut match_len = 0; |
| while match_len < prefix_len && self.previous[match_len] == current[match_len] |
| { |
| match_len += 1; |
| } |
| prefix_lengths.push(match_len as i32); |
| suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len)); |
| // Update previous for the next prefix |
| self.previous.clear(); |
| self.previous.extend_from_slice(current); |
| } |
| self.prefix_len_encoder.put(&prefix_lengths)?; |
| self.suffix_writer.put(&suffixes)?; |
| |
| Ok(()) |
| } |
| |
| // Performance Note: |
| // As far as can be seen these functions are rarely called and as such we can hint to the |
| // compiler that they dont need to be folded into hot locations in the final output. |
| #[cold] |
| fn encoding(&self) -> Encoding { |
| Encoding::DELTA_BYTE_ARRAY |
| } |
| |
| fn estimated_data_encoded_size(&self) -> usize { |
| self.prefix_len_encoder.estimated_data_encoded_size() |
| + self.suffix_writer.estimated_data_encoded_size() |
| } |
| |
| fn flush_buffer(&mut self) -> Result<ByteBufferPtr> { |
| match T::get_physical_type() { |
| Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { |
| // TODO: investigate if we can merge lengths and suffixes |
| // without copying data into new vector. |
| let mut total_bytes = vec![]; |
| // Insert lengths ... |
| let lengths = self.prefix_len_encoder.flush_buffer()?; |
| total_bytes.extend_from_slice(lengths.data()); |
| // ... followed by suffixes |
| let suffixes = self.suffix_writer.flush_buffer()?; |
| total_bytes.extend_from_slice(suffixes.data()); |
| |
| self.previous.clear(); |
| Ok(ByteBufferPtr::new(total_bytes)) |
| } |
| _ => panic!( |
| "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" |
| ) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use std::sync::Arc; |
| |
| use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; |
| use crate::schema::types::{ |
| ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType, |
| }; |
| use crate::util::{ |
| memory::MemTracker, |
| test_common::{random_bytes, RandGen}, |
| }; |
| |
| const TEST_SET_SIZE: usize = 1024; |
| |
| #[test] |
| fn test_get_encoders() { |
| // supported encodings |
| create_and_check_encoder::<Int32Type>(Encoding::PLAIN, None); |
| create_and_check_encoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None); |
| create_and_check_encoder::<Int32Type>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None); |
| create_and_check_encoder::<Int32Type>(Encoding::DELTA_BYTE_ARRAY, None); |
| create_and_check_encoder::<BoolType>(Encoding::RLE, None); |
| |
| // error when initializing |
| create_and_check_encoder::<Int32Type>( |
| Encoding::RLE_DICTIONARY, |
| Some(general_err!( |
| "Cannot initialize this encoding through this function" |
| )), |
| ); |
| create_and_check_encoder::<Int32Type>( |
| Encoding::PLAIN_DICTIONARY, |
| Some(general_err!( |
| "Cannot initialize this encoding through this function" |
| )), |
| ); |
| |
| // unsupported |
| create_and_check_encoder::<Int32Type>( |
| Encoding::BIT_PACKED, |
| Some(nyi_err!("Encoding BIT_PACKED is not supported")), |
| ); |
| } |
| |
| #[test] |
| fn test_bool() { |
| BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_i32() { |
| Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_i64() { |
| Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_i96() { |
| Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_float() { |
| FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_double() { |
| DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_byte_array() { |
| ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1); |
| ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1); |
| ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1); |
| ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1); |
| } |
| |
| #[test] |
| fn test_fixed_lenbyte_array() { |
| FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100); |
| FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100); |
| FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100); |
| } |
| |
| #[test] |
| fn test_dict_encoded_size() { |
| fn run_test<T: DataType>( |
| type_length: i32, |
| values: &[T::T], |
| expected_size: usize, |
| ) { |
| let mut encoder = create_test_dict_encoder::<T>(type_length); |
| assert_eq!(encoder.dict_encoded_size(), 0); |
| encoder.put(values).unwrap(); |
| assert_eq!(encoder.dict_encoded_size(), expected_size); |
| // We do not reset encoded size of the dictionary keys after flush_buffer |
| encoder.flush_buffer().unwrap(); |
| assert_eq!(encoder.dict_encoded_size(), expected_size); |
| } |
| |
| // Only 2 variations of values 1 byte each |
| run_test::<BoolType>(-1, &[true, false, true, false, true], 2); |
| run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20); |
| run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40); |
| run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20); |
| run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40); |
| // Int96: len + reference |
| run_test::<Int96Type>( |
| -1, |
| &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])], |
| 32, |
| ); |
| run_test::<ByteArrayType>( |
| -1, |
| &[ByteArray::from("abcd"), ByteArray::from("efj")], |
| 15, |
| ); |
| run_test::<FixedLenByteArrayType>( |
| 2, |
| &[ByteArray::from("ab").into(), ByteArray::from("bc").into()], |
| 4, |
| ); |
| } |
| |
| #[test] |
| fn test_estimated_data_encoded_size() { |
| fn run_test<T: DataType>( |
| encoding: Encoding, |
| type_length: i32, |
| values: &[T::T], |
| initial_size: usize, |
| max_size: usize, |
| flush_size: usize, |
| ) { |
| let mut encoder = match encoding { |
| Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { |
| Box::new(create_test_dict_encoder::<T>(type_length)) |
| } |
| _ => create_test_encoder::<T>(type_length, encoding), |
| }; |
| assert_eq!(encoder.estimated_data_encoded_size(), initial_size); |
| |
| encoder.put(values).unwrap(); |
| assert_eq!(encoder.estimated_data_encoded_size(), max_size); |
| |
| encoder.flush_buffer().unwrap(); |
| assert_eq!(encoder.estimated_data_encoded_size(), flush_size); |
| } |
| |
| // PLAIN |
| run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0); |
| |
| // DICTIONARY |
| // NOTE: The final size is almost the same because the dictionary entries are |
| // preserved after encoded values have been written. |
| run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 11, 68, 66); |
| |
| // DELTA_BINARY_PACKED |
| run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0); |
| |
| // RLE |
| let mut values = vec![]; |
| values.extend_from_slice(&[true; 16]); |
| values.extend_from_slice(&[false; 16]); |
| run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 2, 0); |
| |
| // DELTA_LENGTH_BYTE_ARRAY |
| run_test::<ByteArrayType>( |
| Encoding::DELTA_LENGTH_BYTE_ARRAY, |
| -1, |
| &[ByteArray::from("ab"), ByteArray::from("abc")], |
| 0, |
| 5, // only value bytes, length encoder is not flushed yet |
| 0, |
| ); |
| |
| // DELTA_BYTE_ARRAY |
| run_test::<ByteArrayType>( |
| Encoding::DELTA_BYTE_ARRAY, |
| -1, |
| &[ByteArray::from("ab"), ByteArray::from("abc")], |
| 0, |
| 3, // only suffix bytes, length encoder is not flushed yet |
| 0, |
| ); |
| } |
| |
| // See: https://github.com/sunchao/parquet-rs/issues/47 |
| #[test] |
| fn test_issue_47() { |
| let mut encoder = |
| create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY); |
| let mut decoder = |
| create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY); |
| |
| let mut input = vec![]; |
| input.push(ByteArray::from("aa")); |
| input.push(ByteArray::from("aaa")); |
| input.push(ByteArray::from("aa")); |
| input.push(ByteArray::from("aaa")); |
| let mut output = vec![ByteArray::default(); input.len()]; |
| |
| let mut result = |
| put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]); |
| assert!( |
| result.is_ok(), |
| "first put_and_get() failed with: {}", |
| result.unwrap_err() |
| ); |
| result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]); |
| assert!( |
| result.is_ok(), |
| "second put_and_get() failed with: {}", |
| result.unwrap_err() |
| ); |
| assert_eq!(output, input); |
| } |
| |
| trait EncodingTester<T: DataType> { |
| fn test(enc: Encoding, total: usize, type_length: i32) { |
| let result = match enc { |
| Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { |
| Self::test_dict_internal(total, type_length) |
| } |
| enc => Self::test_internal(enc, total, type_length), |
| }; |
| |
| assert!( |
| result.is_ok(), |
| "Expected result to be OK but got err:\n {}", |
| result.unwrap_err() |
| ); |
| } |
| |
| fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>; |
| |
| fn test_dict_internal(total: usize, type_length: i32) -> Result<()>; |
| } |
| |
| impl<T: DataType + RandGen<T>> EncodingTester<T> for T { |
| fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> { |
| let mut encoder = create_test_encoder::<T>(type_length, enc); |
| let mut decoder = create_test_decoder::<T>(type_length, enc); |
| let mut values = <T as RandGen<T>>::gen_vec(type_length, total); |
| let mut result_data = vec![T::T::default(); total]; |
| |
| // Test put/get spaced. |
| let num_bytes = bit_util::ceil(total as i64, 8); |
| let valid_bits = random_bytes(num_bytes as usize); |
| let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?; |
| let data = encoder.flush_buffer()?; |
| decoder.set_data(data, values_written)?; |
| let _ = decoder.get_spaced( |
| &mut result_data[..], |
| values.len() - values_written, |
| &valid_bits[..], |
| )?; |
| |
| // Check equality |
| for i in 0..total { |
| if bit_util::get_bit(&valid_bits[..], i) { |
| assert_eq!(result_data[i], values[i]); |
| } else { |
| assert_eq!(result_data[i], T::T::default()); |
| } |
| } |
| |
| let mut actual_total = put_and_get( |
| &mut encoder, |
| &mut decoder, |
| &values[..], |
| &mut result_data[..], |
| )?; |
| assert_eq!(actual_total, total); |
| assert_eq!(result_data, values); |
| |
| // Encode more data after flush and test with decoder |
| |
| values = <T as RandGen<T>>::gen_vec(type_length, total); |
| actual_total = put_and_get( |
| &mut encoder, |
| &mut decoder, |
| &values[..], |
| &mut result_data[..], |
| )?; |
| assert_eq!(actual_total, total); |
| assert_eq!(result_data, values); |
| |
| Ok(()) |
| } |
| |
| fn test_dict_internal(total: usize, type_length: i32) -> Result<()> { |
| let mut encoder = create_test_dict_encoder::<T>(type_length); |
| let mut values = <T as RandGen<T>>::gen_vec(type_length, total); |
| encoder.put(&values[..])?; |
| |
| let mut data = encoder.flush_buffer()?; |
| let mut decoder = create_test_dict_decoder::<T>(); |
| let mut dict_decoder = PlainDecoder::<T>::new(type_length); |
| dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?; |
| decoder.set_dict(Box::new(dict_decoder))?; |
| let mut result_data = vec![T::T::default(); total]; |
| decoder.set_data(data, total)?; |
| let mut actual_total = decoder.get(&mut result_data)?; |
| |
| assert_eq!(actual_total, total); |
| assert_eq!(result_data, values); |
| |
| // Encode more data after flush and test with decoder |
| |
| values = <T as RandGen<T>>::gen_vec(type_length, total); |
| encoder.put(&values[..])?; |
| data = encoder.flush_buffer()?; |
| |
| let mut dict_decoder = PlainDecoder::<T>::new(type_length); |
| dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?; |
| decoder.set_dict(Box::new(dict_decoder))?; |
| decoder.set_data(data, total)?; |
| actual_total = decoder.get(&mut result_data)?; |
| |
| assert_eq!(actual_total, total); |
| assert_eq!(result_data, values); |
| |
| Ok(()) |
| } |
| } |
| |
| fn put_and_get<T: DataType>( |
| encoder: &mut Box<dyn Encoder<T>>, |
| decoder: &mut Box<dyn Decoder<T>>, |
| input: &[T::T], |
| output: &mut [T::T], |
| ) -> Result<usize> { |
| encoder.put(input)?; |
| let data = encoder.flush_buffer()?; |
| decoder.set_data(data, input.len())?; |
| decoder.get(output) |
| } |
| |
| fn create_and_check_encoder<T: DataType>( |
| encoding: Encoding, |
| err: Option<ParquetError>, |
| ) { |
| let descr = create_test_col_desc_ptr(-1, T::get_physical_type()); |
| let mem_tracker = Arc::new(MemTracker::new()); |
| let encoder = get_encoder::<T>(descr, encoding, mem_tracker); |
| match err { |
| Some(parquet_error) => { |
| assert!(encoder.is_err()); |
| assert_eq!(encoder.err().unwrap(), parquet_error); |
| } |
| None => { |
| assert!(encoder.is_ok()); |
| assert_eq!(encoder.unwrap().encoding(), encoding); |
| } |
| } |
| } |
| |
| // Creates test column descriptor. |
| fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr { |
| let ty = SchemaType::primitive_type_builder("t", t) |
| .with_length(type_len) |
| .build() |
| .unwrap(); |
| Arc::new(ColumnDescriptor::new( |
| Arc::new(ty), |
| 0, |
| 0, |
| ColumnPath::new(vec![]), |
| )) |
| } |
| |
| fn create_test_encoder<T: DataType>( |
| type_len: i32, |
| enc: Encoding, |
| ) -> Box<dyn Encoder<T>> { |
| let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); |
| let mem_tracker = Arc::new(MemTracker::new()); |
| get_encoder(desc, enc, mem_tracker).unwrap() |
| } |
| |
| fn create_test_decoder<T: DataType>( |
| type_len: i32, |
| enc: Encoding, |
| ) -> Box<dyn Decoder<T>> { |
| let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); |
| get_decoder(desc, enc).unwrap() |
| } |
| |
| fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> { |
| let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); |
| let mem_tracker = Arc::new(MemTracker::new()); |
| DictEncoder::<T>::new(desc, mem_tracker) |
| } |
| |
| fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> { |
| DictDecoder::<T>::new() |
| } |
| } |