blob: 6727589f17e4fb392091ef058e7a5b6a3ca1c221 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{cmp, mem};
use super::rle::{RleDecoder, RleEncoder};
use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::errors::{ParquetError, Result};
use crate::util::{
bit_util::{ceil, log2, BitReader, BitWriter},
memory::ByteBufferPtr,
};
/// Computes max buffer size for level encoder/decoder based on encoding, max
/// repetition/definition level and number of total buffered values (includes null
/// values).
#[inline]
pub fn max_buffer_size(
encoding: Encoding,
max_level: i16,
num_buffered_values: usize,
) -> usize {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => {
RleEncoder::max_buffer_size(bit_width, num_buffered_values)
+ RleEncoder::min_buffer_size(bit_width)
}
Encoding::BIT_PACKED => {
ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize
}
_ => panic!("Unsupported encoding type {}", encoding),
}
}
/// Encoder for definition/repetition levels.
/// Currently only supports RLE and BIT_PACKED (dev/null) encoding, including v2.
pub enum LevelEncoder {
RLE(RleEncoder),
RLE_V2(RleEncoder),
BIT_PACKED(u8, BitWriter),
}
impl LevelEncoder {
/// Creates new level encoder based on encoding, max level and underlying byte buffer.
/// For bit packed encoding it is assumed that buffer is already allocated with
/// `levels::max_buffer_size` method.
///
/// Used to encode levels for Data Page v1.
///
/// Panics, if encoding is not supported.
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelEncoder::RLE(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
)),
Encoding::BIT_PACKED => {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
LevelEncoder::BIT_PACKED(
bit_width,
BitWriter::new_from_buf(byte_buffer, 0),
)
}
_ => panic!("Unsupported encoding type {}", encoding),
}
}
/// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
/// repetition and definition levels.
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelEncoder::RLE_V2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
}
/// Put/encode levels vector into this level encoder.
/// Returns number of encoded values that are less than or equal to length of the
/// input buffer.
///
/// RLE and BIT_PACKED level encoders return Err() when internal buffer overflows or
/// flush fails.
#[inline]
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
let mut num_encoded = 0;
match *self {
LevelEncoder::RLE(ref mut encoder)
| LevelEncoder::RLE_V2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
}
num_encoded += 1;
}
encoder.flush()?;
}
LevelEncoder::BIT_PACKED(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
}
num_encoded += 1;
}
encoder.flush();
}
}
Ok(num_encoded)
}
/// Finalizes level encoder, flush all intermediate buffers and return resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes only.
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
match self {
LevelEncoder::RLE(encoder) => {
let mut encoded_data = encoder.consume()?;
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
}
LevelEncoder::RLE_V2(encoder) => encoder.consume(),
LevelEncoder::BIT_PACKED(_, encoder) => Ok(encoder.consume()),
}
}
}
/// Decoder for definition/repetition levels.
/// Currently only supports RLE and BIT_PACKED encoding for Data Page v1 and
/// RLE for Data Page v2.
pub enum LevelDecoder {
RLE(Option<usize>, RleDecoder),
RLE_V2(Option<usize>, RleDecoder),
BIT_PACKED(Option<usize>, u8, BitReader),
}
impl LevelDecoder {
/// Creates new level decoder based on encoding and max definition/repetition level.
/// This method only initializes level decoder, `set_data` method must be called
/// before reading any value.
///
/// Used to encode levels for Data Page v1.
///
/// Panics if encoding is not supported
pub fn v1(encoding: Encoding, max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelDecoder::RLE(None, RleDecoder::new(bit_width)),
Encoding::BIT_PACKED => {
LevelDecoder::BIT_PACKED(None, bit_width, BitReader::from(Vec::new()))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
}
/// Creates new level decoder based on RLE encoding.
/// Used to decode Data Page v2 repetition and definition levels.
///
/// To set data for this decoder, use `set_data_range` method.
pub fn v2(max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelDecoder::RLE_V2(None, RleDecoder::new(bit_width))
}
/// Sets data for this level decoder, and returns total number of bytes set.
/// This is used for Data Page v1 levels.
///
/// `data` is encoded data as byte buffer, `num_buffered_values` represents total
/// number of values that is expected.
///
/// Both RLE and BIT_PACKED level decoders set `num_buffered_values` as total number
/// of values that they can return and track num values.
#[inline]
pub fn set_data(&mut self, num_buffered_values: usize, data: ByteBufferPtr) -> usize {
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder) => {
*num_values = Some(num_buffered_values);
let i32_size = mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize;
decoder.set_data(data.range(i32_size, data_size));
i32_size + data_size
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
*num_values = Some(num_buffered_values);
// Set appropriate number of bytes: if max size is larger than buffer -
// set full buffer
let num_bytes =
ceil((num_buffered_values * bit_width as usize) as i64, 8);
let data_size = cmp::min(num_bytes as usize, data.len());
decoder.reset(data.range(data.start(), data_size));
data_size
}
_ => panic!(),
}
}
/// Sets byte array explicitly when start position `start` and length `len` are known
/// in advance. Only supported by RLE level decoder and used for Data Page v2 levels.
/// Returns number of total bytes set for this decoder (len).
#[inline]
pub fn set_data_range(
&mut self,
num_buffered_values: usize,
data: &ByteBufferPtr,
start: usize,
len: usize,
) -> usize {
match *self {
LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
decoder.set_data(data.range(start, len));
*num_values = Some(num_buffered_values);
len
}
_ => panic!(
"set_data_range() method is only supported by RLE v2 encoding type"
),
}
}
/// Returns true if data is set for decoder, false otherwise.
#[inline]
pub fn is_data_set(&self) -> bool {
match self {
LevelDecoder::RLE(ref num_values, _) => num_values.is_some(),
LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(),
}
}
/// Decodes values and puts them into `buffer`.
/// Returns number of values that were successfully decoded (less than or equal to
/// buffer length).
#[inline]
pub fn get(&mut self, buffer: &mut [i16]) -> Result<usize> {
assert!(self.is_data_set(), "No data set for decoding");
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder)
| LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
// Max length we can read
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read = decoder.get_batch::<i16>(&mut buffer[0..len])?;
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
// When extracting values from bit reader, it might return more values
// than left because of padding to a full byte, we use
// num_values to track precise number of values.
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read =
decoder.get_batch::<i16>(&mut buffer[..len], bit_width as usize);
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::test_common::random_numbers_range;
fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
encoder.put(&levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(levels.len(), &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(levels.len(), byte_buf);
};
let mut buffer = vec![0; levels.len()];
let num_decoded = decoder.get(&mut buffer).expect("get() should be OK");
assert_eq!(num_decoded, levels.len());
assert_eq!(buffer, levels);
}
// Performs incremental read until all bytes are read
fn test_internal_roundtrip_incremental(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
encoder.put(&levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(levels.len(), &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(levels.len(), byte_buf);
}
let mut buffer = vec![0; levels.len() * 2];
let mut total_decoded = 0;
let mut safe_stop = levels.len() * 2; // still terminate in case of issues in the code
while safe_stop > 0 {
safe_stop -= 1;
let num_decoded = decoder
.get(&mut buffer[total_decoded..total_decoded + 1])
.expect("get() should be OK");
if num_decoded == 0 {
break;
}
total_decoded += num_decoded;
}
assert!(
safe_stop > 0,
"Failed to read values incrementally, reached safe stop"
);
assert_eq!(total_decoded, levels.len());
assert_eq!(&buffer[0..levels.len()], levels);
}
// Tests encoding/decoding of values when output buffer is larger than number of
// encoded values
fn test_internal_roundtrip_underflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
assert_eq!(num_encoded, 1);
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
// Set one encoded value as `num_buffered_values`
if v2 {
decoder = LevelDecoder::v2(max_level);
decoder.set_data_range(1, &byte_buf, 0, byte_buf.len());
} else {
decoder = LevelDecoder::v1(enc, max_level);
decoder.set_data(1, byte_buf);
}
let mut buffer = vec![0; levels.len()];
let num_decoded = decoder.get(&mut buffer).expect("get() should be OK");
assert_eq!(num_decoded, num_encoded);
assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
}
// Tests when encoded values are larger than encoder's buffer
fn test_internal_roundtrip_overflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
let mut found_err = false;
// Insert a large number of values, so we run out of space
for _ in 0..100 {
if let Err(err) = encoder.put(&levels) {
assert!(format!("{}", err).contains("Not enough bytes left"));
found_err = true;
break;
};
}
if !found_err {
panic!("Failed test: no buffer overflow");
}
}
#[test]
fn test_roundtrip_one() {
let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
let max_level = 1;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip() {
let levels = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let max_level = 10;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_incremental() {
let levels = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let max_level = 10;
test_internal_roundtrip_incremental(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_incremental(
Encoding::BIT_PACKED,
&levels,
max_level,
false,
);
test_internal_roundtrip_incremental(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_all_zeros() {
let levels = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let max_level = 1;
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_random() {
// This test is mainly for bit packed level encoder/decoder
let mut levels = Vec::new();
let max_level = 5;
random_numbers_range::<i16>(120, 0, max_level, &mut levels);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_underflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_underflow(
Encoding::BIT_PACKED,
&levels,
max_level,
false,
);
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_roundtrip_overflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
}
#[test]
fn test_rle_decoder_set_data_range() {
// Buffer containing both repetition and definition levels
let buffer = ByteBufferPtr::new(vec![5, 198, 2, 5, 42, 168, 10, 0, 2, 3, 36, 73]);
let max_rep_level = 1;
let mut decoder = LevelDecoder::v2(max_rep_level);
assert_eq!(decoder.set_data_range(10, &buffer, 0, 3), 3);
let mut result = vec![0; 10];
let num_decoded = decoder.get(&mut result).expect("get() should be OK");
assert_eq!(num_decoded, 10);
assert_eq!(result, vec![0, 1, 1, 0, 0, 0, 1, 1, 0, 1]);
let max_def_level = 2;
let mut decoder = LevelDecoder::v2(max_def_level);
assert_eq!(decoder.set_data_range(10, &buffer, 3, 5), 5);
let mut result = vec![0; 10];
let num_decoded = decoder.get(&mut result).expect("get() should be OK");
assert_eq!(num_decoded, 10);
assert_eq!(result, vec![2, 2, 2, 0, 0, 2, 2, 2, 2, 2]);
}
#[test]
#[should_panic(
expected = "set_data_range() method is only supported by RLE v2 encoding type"
)]
fn test_bit_packed_decoder_set_data_range() {
// Buffer containing both repetition and definition levels
let buffer = ByteBufferPtr::new(vec![1, 2, 3, 4, 5]);
let max_level = 1;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_level);
decoder.set_data_range(10, &buffer, 0, 3);
}
#[test]
fn test_bit_packed_decoder_set_data() {
// Test the maximum size that is assigned based on number of values and buffer
// length
let buffer = ByteBufferPtr::new(vec![1, 2, 3, 4, 5]);
let max_level = 1;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_level);
// This should reset to entire buffer
assert_eq!(decoder.set_data(1024, buffer.all()), buffer.len());
// This should set smallest num bytes
assert_eq!(decoder.set_data(3, buffer.all()), 1);
}
#[test]
#[should_panic(expected = "No data set for decoding")]
fn test_rle_level_decoder_get_no_set_data() {
// `get()` normally panics because bit_reader is not set for RLE decoding
// we have explicit check now in set_data
let max_rep_level = 2;
let mut decoder = LevelDecoder::v1(Encoding::RLE, max_rep_level);
let mut buffer = vec![0; 16];
decoder.get(&mut buffer).unwrap();
}
#[test]
#[should_panic(expected = "No data set for decoding")]
fn test_bit_packed_level_decoder_get_no_set_data() {
let max_rep_level = 2;
let mut decoder = LevelDecoder::v1(Encoding::BIT_PACKED, max_rep_level);
let mut buffer = vec![0; 16];
decoder.get(&mut buffer).unwrap();
}
}