blob: ab36eb2e9a481ef0904c9f2dd4ad3079cfa93a20 [file] [log] [blame]
/// Licensed to the Apache Software Foundation (ASF) under one
/// or more contributor license agreements. See the NOTICE file
/// distributed with this work for additional information
/// regarding copyright ownership. The ASF licenses this file
/// to you under the Apache License, Version 2.0 (the
/// "License"); you may not use this file except in compliance
/// with the License. You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing,
/// software distributed under the License is distributed on an
/// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
/// KIND, either express or implied. See the License for the
/// specific language governing permissions and limitations
/// under the License.
#include <algorithm>
#include <limits>
#include <random>
#include <vector>
#include "gtest/gtest.h"
#include "exec/parquet/parquet-delta-decoder.h"
#include "exec/parquet/parquet-delta-encoder.h"
#include "exec/parquet/parquet-delta-coder-test-data.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include <iostream>
#include <fstream>
namespace impala {
template <typename INT_T>
class DeltaCoderTest {
public:
static std::vector<uint8_t> encode_all(const std::vector<INT_T>& plain,
const std::size_t block_size,
const std::size_t miniblock_size) {
const std::size_t miniblocks_in_block = block_size / miniblock_size;
DCHECK(block_size % miniblock_size == 0);
ParquetDeltaEncoder<INT_T> encoder;
Status init_success = encoder.Init(block_size, miniblocks_in_block);
EXPECT_OK(init_success);
std::vector<uint8_t> buffer(encoder.WorstCaseOutputSize(plain.size()), 0);
encoder.NewPage(buffer.data(), buffer.size());
for (INT_T value : plain) {
EXPECT_TRUE(encoder.Put(value));
}
int written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
buffer.resize(written_bytes);
return buffer;
}
static std::vector<uint8_t> decode_all(
const std::vector<uint8_t>& encoded, int stride) {
DCHECK_GE(stride, sizeof(INT_T));
ParquetDeltaDecoder<INT_T> decoder;
Status page_init_success = decoder.NewPage(encoded.data(), encoded.size());
EXPECT_OK(page_init_success);
const std::size_t total_value_count = decoder.GetTotalValueCount();
std::vector<uint8_t> result(total_value_count * stride, 0);
int decoded_values = decoder.NextValues(total_value_count, result.data(), stride);
EXPECT_EQ(total_value_count, decoded_values);
return result;
}
template <typename OutType>
static std::vector<uint8_t> decode_and_convert_all(
const std::vector<uint8_t>& encoded, int stride) {
DCHECK_GE(stride, sizeof(OutType));
ParquetDeltaDecoder<INT_T> decoder;
Status page_init_success = decoder.NewPage(encoded.data(), encoded.size());
EXPECT_OK(page_init_success);
const std::size_t total_value_count = decoder.GetTotalValueCount();
std::vector<uint8_t> result(total_value_count * stride, 0);
int decoded_values = decoder.template NextValuesConverted<OutType>(total_value_count,
result.data(), stride);
EXPECT_EQ(total_value_count, decoded_values);
return result;
}
static void expect_encoded_data(const std::vector<INT_T>& data,
const std::vector<uint8_t>& expected) {
const std::vector<uint8_t> encoded = encode_all(data, 128, 32);
EXPECT_EQ(expected, encoded);
}
static void expect_decoded_data(const std::vector<INT_T>& plain,
const std::vector<uint8_t>& encoded) {
constexpr int stride = sizeof(INT_T);
const std::vector<uint8_t> decoded_bytes = decode_all(encoded, stride);
std::vector<INT_T> decoded(decoded_bytes.size() / sizeof(INT_T));
memcpy(decoded.data(), decoded_bytes.data(), decoded_bytes.size());
EXPECT_EQ(plain, decoded);
}
template <typename OutType>
static void expect_equal_with_stride(const std::vector<INT_T>& plain,
const vector<uint8_t>& decoded, int stride) {
ASSERT_EQ(plain.size(), decoded.size() / stride);
ASSERT_EQ(0, decoded.size() % stride);
for (std::size_t i = 0; i < plain.size(); i++) {
const uint8_t* decoded_ptr = &decoded[i * stride];
const OutType decoded_value = *reinterpret_cast<const OutType*>(decoded_ptr);
const OutType plain_converted = static_cast<OutType>(plain[i]);
EXPECT_EQ(plain_converted, decoded_value) << "Index: " << i << ".";
}
}
template <typename OutType>
static void expect_decoded_data_converted(const std::vector<INT_T>& plain,
const std::vector<uint8_t>& encoded, int stride) {
DCHECK_GE(stride, sizeof(OutType));
const std::vector<uint8_t> decoded_bytes = decode_and_convert_all<OutType>(encoded,
stride);
expect_equal_with_stride<OutType>(plain, decoded_bytes, stride);
}
static void expect_decoded_data_one_by_one(const std::vector<INT_T>& plain,
const std::vector<uint8_t>& encoded) {
ParquetDeltaDecoder<INT_T> decoder;
Status page_init_success = decoder.NewPage(encoded.data(), encoded.size());
EXPECT_OK(page_init_success);
const std::size_t total_value_count = decoder.GetTotalValueCount();
EXPECT_EQ(plain.size(), total_value_count);
std::vector<uint8_t> decoded_bytes(total_value_count * sizeof(INT_T), 0);
for (std::size_t i = 0; i < total_value_count; i++) {
const std::size_t byte_index = i * sizeof(INT_T);
INT_T* const int_ptr = reinterpret_cast<INT_T*>(&decoded_bytes[byte_index]);
const int decoded_values = decoder.NextValue(int_ptr);
EXPECT_EQ(1, decoded_values);
}
std::vector<INT_T> decoded(decoded_bytes.size() / sizeof(INT_T));
memcpy(decoded.data(), decoded_bytes.data(), decoded_bytes.size());
EXPECT_EQ(plain, decoded);
}
};
template <typename INT_T>
struct block_aligned_for_type;
template <>
struct block_aligned_for_type<int32_t> {
constexpr static const std::vector<int32_t>& plain = block_aligned_plain;
constexpr static const std::vector<uint8_t>& encoded = block_aligned_encoded;
};
template <>
struct block_aligned_for_type<int64_t> {
constexpr static const std::vector<int64_t>& plain = block_aligned_64_bits_plain;
constexpr static const std::vector<uint8_t>& encoded = block_aligned_64_bits_encoded;
};
std::vector<int64_t> convert_vec_to_int64_t(const vector<int32_t>& vec) {
std::vector<int64_t> res(vec.size());
for (std::size_t i = 0; i < vec.size(); i++) {
res[i] = vec[i];
}
return res;
}
/// Encoder tests.
TEST(ParquetDeltaEncoder, Initialize) {
using INT_T = int64_t;
ParquetDeltaEncoder<INT_T> encoder;
Status init_success;
// Block size not divisible by 128.
init_success = encoder.Init(64, 2);
EXPECT_FALSE(init_success.ok());
// Block size of 0.
init_success = encoder.Init(0, 2);
EXPECT_FALSE(init_success.ok());
// Miniblock number not a divisor of block size.
init_success = encoder.Init(128, 3);
EXPECT_FALSE(init_success.ok());
// Zero miniblocks in block.
init_success = encoder.Init(128, 0);
EXPECT_FALSE(init_success.ok());
// Miniblock size not divisible by 32.
init_success = encoder.Init(128, 8);
EXPECT_FALSE(init_success.ok());
// Zero values allowed in page.
init_success = encoder.Init(256, 8, 0);
EXPECT_FALSE(init_success.ok());
// All correct.
init_success = encoder.Init(256, 8);
EXPECT_TRUE(init_success.ok());
// Re-initialisation is not allowed.
init_success = encoder.Init(256, 8);
EXPECT_FALSE(init_success.ok());
}
TEST(ParquetDeltaEncoder, HeaderOnly32) {
DeltaCoderTest<int32_t>::expect_encoded_data(header_only_data, header_only_expected);
}
TEST(ParquetDeltaEncoder, HeaderOnly64) {
DeltaCoderTest<int64_t>::expect_encoded_data(convert_vec_to_int64_t(header_only_data),
header_only_expected);
}
TEST(ParquetDeltaEncoder, ShortBlock32) {
DeltaCoderTest<int32_t>::expect_encoded_data(short_block_plain, short_block_encoded);
}
TEST(ParquetDeltaEncoder, BlockAligned32) {
DeltaCoderTest<int32_t>::expect_encoded_data(block_aligned_plain,
block_aligned_encoded);
}
TEST(ParquetDeltaEncoder, BlockAligned64) {
DeltaCoderTest<int64_t>::expect_encoded_data(block_aligned_64_bits_plain,
block_aligned_64_bits_encoded);
}
TEST(ParquetDeltaEncoder, BlockNotFullyWritten32) {
DeltaCoderTest<int32_t>::expect_encoded_data(block_not_fully_written_plain,
block_not_fully_written_encoded);
}
TEST(ParquetDeltaEncoder, MiniblockNotFullyWritten32) {
DeltaCoderTest<int32_t>::expect_encoded_data(miniblock_not_fully_written_plain,
miniblock_not_fully_written_encoded);
}
TEST(ParquetDeltaEncoder, NegativeDeltas32) {
DeltaCoderTest<int32_t>::expect_encoded_data(negative_deltas_plain,
negative_deltas_encoded);
}
TEST(ParquetDeltaEncoder, DeltasAreTheSame32) {
DeltaCoderTest<int32_t>::expect_encoded_data(deltas_are_the_same_plain,
deltas_are_the_same_encoded);
}
TEST(ParquetDeltaEncoder, ValuesAreTheSame32) {
DeltaCoderTest<int32_t>::expect_encoded_data(values_are_the_same_plain,
values_are_the_same_encoded);
}
TEST(ParquetDeltaEncoder, DeltaIsZeroForEachBlock32) {
DeltaCoderTest<int32_t>::expect_encoded_data(delta_is_zero_for_each_block_plain,
delta_is_zero_for_each_block_encoded);
}
TEST(ParquetDeltaEncoder, DataIsNotAlignedWithBlock32) {
DeltaCoderTest<int32_t>::expect_encoded_data(data_is_not_aligned_with_block_plain,
data_is_not_aligned_with_block_encoded);
}
TEST(ParquetDeltaEncoder, MaxMinValue32) {
DeltaCoderTest<int32_t>::expect_encoded_data(max_min_value_plain,
max_min_value_encoded);
}
template <class INT_T>
void PutAfterFinalizePageFails() {
constexpr int block_size = 128;
constexpr int miniblocks_in_block = 4;
ParquetDeltaEncoder<INT_T> encoder;
Status init_success = encoder.Init(block_size, miniblocks_in_block);
ASSERT_OK(init_success);
std::vector<INT_T> first_page = {1, 2};
// We should have room for at least a full block after we insert values and
// finalise the page.
const int values_reservation = block_size + first_page.size();
std::vector<uint8_t> buffer(encoder.WorstCaseOutputSize(values_reservation), 0);
encoder.NewPage(buffer.data(), buffer.size());
for (INT_T value : first_page) {
EXPECT_TRUE(encoder.Put(value));
}
// Finalize page.
int written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
// Try to insert a new value without starting a new page.
constexpr INT_T second_page_value = 1;
EXPECT_FALSE(encoder.Put(second_page_value));
// Check that only the legally inserted elements are present.
DeltaCoderTest<INT_T>::expect_decoded_data(first_page, buffer);
// Start a new page (we can overwrite the previous one now). After this the values
// should be inserted successfully.
encoder.NewPage(buffer.data(), buffer.size());
EXPECT_TRUE(encoder.Put(second_page_value));
// Finalize the second page.
written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
DeltaCoderTest<INT_T>::expect_decoded_data({second_page_value}, buffer);
}
TEST(ParquetDeltaEncoder, PutAfterFinalizePageFails32) {
PutAfterFinalizePageFails<int32_t>();
}
TEST(ParquetDeltaEncoder, PutAfterFinalizePageFails64) {
PutAfterFinalizePageFails<int64_t>();
}
template <class INT_T>
void PutFailureAtBufferEndLeavesCleanState() {
constexpr int block_size = 128;
constexpr int miniblocks_in_block = 4;
ParquetDeltaEncoder<INT_T> encoder;
Status init_success = encoder.Init(block_size, miniblocks_in_block);
ASSERT_OK(init_success);
const int buffer_len_for_block = encoder.WorstCaseOutputSize(block_size);
const int real_buffer_len = encoder.WorstCaseOutputSize(block_size + 10);
std::vector<INT_T> first_block;
// The first value is in the header, so we need one more to fill the block.
for (int i = 0; i < block_size + 1; i++) {
// We should have large deltas so no space remains in the block.
INT_T value = i % 2 == 0 ?
std::numeric_limits<INT_T>::max() : std::numeric_limits<INT_T>::min();
first_block.push_back(value);
}
std::vector<uint8_t> buffer(real_buffer_len);
encoder.NewPage(buffer.data(), buffer_len_for_block);
for (INT_T value : first_block) {
EXPECT_TRUE(encoder.Put(value));
}
// There is no space for more elements.
EXPECT_FALSE(encoder.Put(10));
// Finalize page.
int written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
// Check that only the legally inserted elements are present.
DeltaCoderTest<INT_T>::expect_decoded_data(first_block, buffer);
}
TEST(ParquetDeltaEncoder, PutFailureAtBufferEndLeavesCleanState32) {
PutFailureAtBufferEndLeavesCleanState<int32_t>();
}
TEST(ParquetDeltaEncoder, PutFailureAtBufferEndLeavesCleanState64) {
PutFailureAtBufferEndLeavesCleanState<int64_t>();
}
template <class INT_T>
void PutFailureMaxNumValuesLeavesCleanState() {
constexpr int block_size = 128;
constexpr int miniblocks_in_block = 4;
constexpr unsigned int max_page_value_count =
ParquetDeltaEncoder<INT_T>::DEFAULT_MAX_TOTAL_VALUE_COUNT;
ParquetDeltaEncoder<INT_T> encoder;
Status init_success = encoder.Init(block_size, miniblocks_in_block,
max_page_value_count);
ASSERT_OK(init_success);
const int buffer_len = encoder.WorstCaseOutputSize(max_page_value_count);
std::vector<uint8_t> buffer(buffer_len);
std::vector<INT_T> first_page(max_page_value_count, 0);
encoder.NewPage(buffer.data(), buffer.size());
for (INT_T value : first_page) {
EXPECT_TRUE(encoder.Put(value));
}
// There would be space for more elements because all values are the same, which takes
// up much less space than the worst case for which we have reservation; but the maximal
// number of values for a page has been reached.
EXPECT_FALSE(encoder.Put(0));
// Finalize page.
int written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
// Check that only the legally inserted elements are present.
DeltaCoderTest<INT_T>::expect_decoded_data(first_page, buffer);
}
TEST(ParquetDeltaEncoder, PutFailureMaxNumValuesLeavesCleanState32) {
PutFailureMaxNumValuesLeavesCleanState<int32_t>();
}
TEST(ParquetDeltaEncoder, PutFailureMaxNumValuesLeavesCleanState64) {
PutFailureMaxNumValuesLeavesCleanState<int64_t>();
}
template <class INT_T>
void StartNewPageWithoutFinalizeDiscardsValues() {
constexpr int block_size = 128;
constexpr int miniblocks_in_block = 4;
ParquetDeltaEncoder<INT_T> encoder;
Status init_success = encoder.Init(block_size, miniblocks_in_block);
ASSERT_OK(init_success);
constexpr int num_values = 5;
const int buffer_len = encoder.WorstCaseOutputSize(num_values);
std::vector<uint8_t> buffer(buffer_len);
const std::vector<INT_T> unwritten_page(num_values, 0);
encoder.NewPage(buffer.data(), buffer.size());
for (INT_T value : unwritten_page) {
EXPECT_TRUE(encoder.Put(value));
}
// We start a new page without finalising the previous one.
const std::vector<INT_T> written_page(num_values, 5);
encoder.NewPage(buffer.data(), buffer.size());
for (INT_T value : written_page) {
EXPECT_TRUE(encoder.Put(value));
}
// Finalize page.
int written_bytes = encoder.FinalizePage();
EXPECT_GT(written_bytes, 0);
// Check that only the elements from the second page elements are present.
DeltaCoderTest<INT_T>::expect_decoded_data(written_page, buffer);
}
TEST(ParquetDeltaEncoder, StartNewPageWithoutFinalizeDiscardsValues32) {
StartNewPageWithoutFinalizeDiscardsValues<int32_t>();
}
TEST(ParquetDeltaEncoder, StartNewPageWithoutFinalizeDiscardsValues64) {
StartNewPageWithoutFinalizeDiscardsValues<int64_t>();
}
/// Decoder tests.
TEST(ParquetDeltaDecoder, HeaderOnly32) {
DeltaCoderTest<int32_t>::expect_decoded_data(header_only_data,
header_only_expected);
}
TEST(ParquetDeltaDecoder, HeaderOnly64) {
DeltaCoderTest<int64_t>::expect_decoded_data(convert_vec_to_int64_t(header_only_data),
header_only_expected);
}
TEST(ParquetDeltaDecoder, ShortBlock32) {
DeltaCoderTest<int32_t>::expect_decoded_data(short_block_plain,
short_block_encoded);
}
TEST(ParquetDeltaDecoder, BlockAligned32) {
DeltaCoderTest<int32_t>::expect_decoded_data(block_aligned_plain,
block_aligned_encoded);
}
TEST(ParquetDeltaDecoder, BlockNotFullyWritten32) {
DeltaCoderTest<int32_t>::expect_decoded_data(block_not_fully_written_plain,
block_not_fully_written_encoded);
}
TEST(ParquetDeltaDecoder, MiniblockNotFullyWritten32) {
DeltaCoderTest<int32_t>::expect_decoded_data(miniblock_not_fully_written_plain,
miniblock_not_fully_written_encoded);
}
TEST(ParquetDeltaDecoder, NegativeDeltas32) {
DeltaCoderTest<int32_t>::expect_decoded_data(negative_deltas_plain,
negative_deltas_encoded);
}
TEST(ParquetDeltaDecoder, DeltasAreTheSame32) {
DeltaCoderTest<int32_t>::expect_decoded_data(deltas_are_the_same_plain,
deltas_are_the_same_encoded);
}
TEST(ParquetDeltaDecoder, ValuesAreTheSame32) {
DeltaCoderTest<int32_t>::expect_decoded_data(values_are_the_same_plain,
values_are_the_same_encoded);
}
TEST(ParquetDeltaDecoder, DeltaIsZeroForEachBlock32) {
DeltaCoderTest<int32_t>::expect_decoded_data(delta_is_zero_for_each_block_plain,
delta_is_zero_for_each_block_encoded);
}
TEST(ParquetDeltaDecoder, DataIsNotAlignedWithBlock32) {
DeltaCoderTest<int32_t>::expect_decoded_data(data_is_not_aligned_with_block_plain,
data_is_not_aligned_with_block_encoded);
}
TEST(ParquetDeltaDecoder, MaxMinValue32) {
DeltaCoderTest<int32_t>::expect_decoded_data(max_min_value_plain,
max_min_value_encoded);
}
TEST(ParquetDeltaDecoder, BlockAligned64) {
DeltaCoderTest<int64_t>::expect_decoded_data(block_aligned_64_bits_plain,
block_aligned_64_bits_encoded);
}
TEST(ParquetDeltaDecoder, OneByOne32) {
DeltaCoderTest<int32_t>::expect_decoded_data_one_by_one(block_aligned_plain,
block_aligned_encoded);
}
TEST(ParquetDeltaDecoder, OneByOne64) {
DeltaCoderTest<int64_t>::expect_decoded_data_one_by_one(block_aligned_64_bits_plain,
block_aligned_64_bits_encoded);
}
void ConversionAndStride() {
// The INT64 Parquet type is only used with the BIGINT Impala type, so no conversion is
// possible.
using INT_T = int32_t;
const std::vector<INT_T>& plain = block_aligned_for_type<INT_T>::plain;
const std::vector<uint8_t>& encoded = block_aligned_for_type<INT_T>::encoded;
constexpr int extra_stride = 3;
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int8_t>(plain, encoded,
sizeof(int8_t));
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int8_t>(plain, encoded,
sizeof(int8_t) + extra_stride);
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int16_t>(plain, encoded,
sizeof(int16_t));
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int16_t>(plain, encoded,
sizeof(int16_t) + extra_stride);
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int64_t>(plain, encoded,
sizeof(int64_t));
DeltaCoderTest<INT_T>::template expect_decoded_data_converted<int64_t>(plain, encoded,
sizeof(int64_t) + extra_stride);
}
TEST(ParquetDeltaDecoder, ConversionAndStride) {
ConversionAndStride();
}
/// Wrong block and miniblock sizes and bitwidths.
template <typename INT_T>
void ExpectPageInitFails(const std::vector<uint8_t>& encoded) {
ParquetDeltaDecoder<INT_T> reader;
Status page_init_success = reader.NewPage(encoded.data(), encoded.size());
EXPECT_FALSE(page_init_success.ok());
}
template <typename INT_T>
void BlockSizeNotMultipleOf128() {
std::vector<uint8_t> encoded = {0x8c, 0x01 /* 140 */, 0x04, 0x01, 0x04};
ExpectPageInitFails<INT_T>(encoded);
}
TEST(ParquetDeltaDecoder, BlockSizeNotMultipleOf128_32) {
BlockSizeNotMultipleOf128<int32_t>();
}
TEST(ParquetDeltaDecoder, BlockSizeNotMultipleOf128_64) {
BlockSizeNotMultipleOf128<int64_t>();
}
template <typename INT_T>
void MiniblockSizeNotMultipleOf32() {
std::vector<uint8_t> encoded = {0x80, 0x01, 0x08 /* miniblock size is 16 */,
0x01, 0x04};
ExpectPageInitFails<INT_T>(encoded);
}
TEST(ParquetDeltaDecoder, MiniblockSizeNotMultipleOf32_32) {
MiniblockSizeNotMultipleOf32<int32_t>();
}
TEST(ParquetDeltaDecoder, MiniblockSizeNotMultipleOf32_64) {
MiniblockSizeNotMultipleOf32<int64_t>();
}
template <typename INT_T>
void BlockSizeZero() {
std::vector<uint8_t> encoded = {0x00, 0x08, 0x01, 0x04};
ExpectPageInitFails<INT_T>(encoded);
}
TEST(ParquetDeltaDecoder, BlockSizeZero32) {
BlockSizeZero<int32_t>();
}
TEST(ParquetDeltaDecoder, BlockSizeZero64) {
BlockSizeZero<int64_t>();
}
template <typename INT_T>
void MiniblockBiggerThanBlock() {
std::vector<uint8_t> encoded = {0x80, 0x01, 0x80, 0x02, 0x01, 0x04};
ExpectPageInitFails<INT_T>(encoded);
}
TEST(ParquetDeltaDecoder, MiniblockBiggerThanBlock32) {
MiniblockBiggerThanBlock<int32_t>();
}
TEST(ParquetDeltaDecoder, MiniblockBiggerThanBlock64) {
MiniblockBiggerThanBlock<int64_t>();
}
template <typename INT_T>
void BitwidthTooBig() {
std::vector<uint8_t> encoded = {0x80, 0x01, 0x04, 0x02, 0x04, /// Header.
0x00, /// Min delta in block.
0x41, 0x10, 0x10, 0x10, // Bitwidths --- 0x41 == 65 is too big.
// Enough bytes for the values so that the buffer is not too short and the expected
// error comes from the wrong bitwidth.
0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33,
0x33, 0x33, 0x33, 0x33, 0x33};
ParquetDeltaDecoder<INT_T> reader;
// The exact location where the reading fails is an implementation detail, but it should
// happen at the latest when trying to read the first value with the wrong bitwidth,
// which is the second value in total (the first value is in the header).
Status page_init_success = reader.NewPage(encoded.data(), encoded.size());
if (!page_init_success.ok()) return;
INT_T value;
// The first value is in the header.
bool first_value_success = reader.NextValue(&value);
if (!first_value_success) return;
// This is the value that has reported bitwidth 65.
bool second_value_success = reader.NextValue(&value);
EXPECT_FALSE(second_value_success);
}
TEST(ParquetDeltaDecoder, BitwidthTooBig32) {
BitwidthTooBig<int32_t>();
}
TEST(ParquetDeltaDecoder, BitwidthTooBig64) {
BitwidthTooBig<int64_t>();
}
// Strides and batch sizes and skip.
template <typename INT_T>
void DifferentStrides() {
const vector<INT_T>& plain = block_aligned_for_type<INT_T>::plain;
const std::vector<uint8_t> encoded =
DeltaCoderTest<INT_T>::encode_all(plain, 128, 32);
for (int i = sizeof(INT_T) + 1; i < 10; i++) {
const std::vector<uint8_t> decoded = DeltaCoderTest<INT_T>::decode_all(encoded, i);
DeltaCoderTest<INT_T>::template expect_equal_with_stride<INT_T>(plain, decoded, i);
}
}
TEST(ParquetDeltaDecoder, DifferentStrides32) {
DifferentStrides<int32_t>();
}
TEST(ParquetDeltaDecoder, DifferentStrides64) {
DifferentStrides<int64_t>();
}
template <typename INT_T>
void DifferentBatchSizes() {
const std::vector<INT_T>& plain = block_aligned_for_type<INT_T>::plain;
const std::vector<uint8_t> encoded =
DeltaCoderTest<INT_T>::encode_all(plain, 128, 32);
const std::vector<int> batch_sizes = {10, 14, 50, 25};
ParquetDeltaDecoder<INT_T> reader;
Status page_init_success = reader.NewPage(encoded.data(), encoded.size());
EXPECT_OK(page_init_success);
const std::size_t total_value_count = reader.GetTotalValueCount();
std::size_t read_values = 0;
std::vector<INT_T> decoded(total_value_count);
std::size_t batch_index = 0;
while (read_values < total_value_count) {
int batch_size = batch_sizes[batch_index];
int values_read_now = reader.NextValues(batch_size,
(uint8_t*) &decoded[read_values], sizeof(INT_T));
ASSERT_GT(values_read_now, 0);
ASSERT_LE(values_read_now, batch_size);
read_values += values_read_now;
batch_index = (batch_index + 1) % batch_sizes.size();
EXPECT_TRUE(values_read_now == batch_size || read_values == total_value_count);
}
EXPECT_EQ(plain, decoded);
}
TEST(ParquetDeltaDecoder, DifferentBatchSizes32) {
DifferentBatchSizes<int32_t>();
}
TEST(ParquetDeltaDecoder, DifferentBatchSizes64) {
DifferentBatchSizes<int64_t>();
}
/// We alternate skipping and writing the number of values that is the next element in the
/// skip pattern. The remaining values in the input are written.
template <typename INT_T>
std::vector<INT_T> EraseSkipPattern(std::vector<INT_T> plain,
const std::vector<std::size_t>& skip_pattern) {
DCHECK_LE(std::accumulate(skip_pattern.begin(), skip_pattern.end(), 0), plain.size())
<< "Input vector not long enough for pattern.";
std::size_t write_count = 0;
for (std::size_t i = 0; i < skip_pattern.size(); i++) {
if (i % 2 == 0) {
/// Skipping.
const std::size_t skip = skip_pattern[i];
plain.erase(plain.begin() + write_count, plain.begin() + write_count + skip);
} else {
write_count += skip_pattern[i];
}
}
return plain;
}
template <typename INT_T>
void Skip() {
std::vector<std::vector<std::size_t>> skip_patterns = {
{12, 100, 50},
{50, 54, 125, 14, 282}
};
for (int miniblock_size : {32, 64, 96}) {
constexpr int MINIBLOCKS_IN_BLOCK = 4;
const int block_size = miniblock_size * MINIBLOCKS_IN_BLOCK;
std::vector<INT_T> plain = block_aligned_for_type<INT_T>::plain;
const std::vector<uint8_t> encoded = DeltaCoderTest<INT_T>::encode_all(plain,
block_size, miniblock_size);
const int stride = sizeof(INT_T);
for (const std::vector<std::size_t>& skip_pattern : skip_patterns) {
ParquetDeltaDecoder<INT_T> decoder;
Status page_init_success = decoder.NewPage(encoded.data(), encoded.size());
EXPECT_OK(page_init_success);
const std::vector<INT_T> expected = EraseSkipPattern(plain, skip_pattern);
std::vector<INT_T> output(expected.size());
uint8_t* const output_data_ptr = reinterpret_cast<uint8_t* const>(output.data());
std::size_t write_count = 0;
for (std::size_t i = 0; i < skip_pattern.size(); i++) {
if (i % 2 == 0) {
/// Skipping.
const std::size_t to_skip = skip_pattern[i];
const int skipped = decoder.SkipValues(to_skip);
EXPECT_EQ(to_skip, skipped);
} else {
const std::size_t to_write = skip_pattern[i];
const int written = decoder.NextValues(to_write,
output_data_ptr + write_count * stride, stride);
EXPECT_EQ(to_write, written);
write_count += to_write;
}
}
const std::size_t values_left = expected.size() - write_count;
const int written = decoder.NextValues(values_left,
output_data_ptr + write_count * stride, stride);
EXPECT_EQ(values_left, written);
EXPECT_EQ(expected.size(), write_count + written);
EXPECT_EQ(expected, output);
}
}
}
TEST(ParquetDeltaDecoder, Skip32) {
Skip<int32_t>();
}
TEST(ParquetDeltaDecoder, Skip64) {
Skip<int64_t>();
}
/// Round-trip tests.
template <typename INT_T>
void expect_code_and_decode(const std::vector<INT_T>& data,
int block_size, int miniblock_size) {
const std::vector<uint8_t> encoded = DeltaCoderTest<INT_T>::encode_all(data,
block_size, miniblock_size);
const std::vector<uint8_t> decoded_bytes = DeltaCoderTest<INT_T>::decode_all(encoded,
sizeof(INT_T));
std::vector<INT_T> decoded(decoded_bytes.size() / sizeof(INT_T));
memcpy(decoded.data(), decoded_bytes.data(), decoded_bytes.size());
EXPECT_EQ(data.size(), decoded.size());
EXPECT_EQ(data, decoded) << "Block size: " << block_size << ". Miniblock size: "
<< miniblock_size << ".";
}
template <typename INT_T>
void DifferentBlockAndMiniblockSizes() {
for (std::size_t i = 1; i < 10; i++) {
const std::size_t block_size = i * 128;
for (std::size_t j = 1; j * 32 <= block_size; j++) {
const std::size_t miniblock_size = j * 32;
if (block_size % miniblock_size == 0) {
const vector<INT_T>& plain = block_aligned_for_type<INT_T>::plain;
expect_code_and_decode<INT_T>(plain, block_size, miniblock_size);
}
}
}
}
TEST(ParquetDeltaCoder, DifferentBlockAndMiniblockSizes32) {
DifferentBlockAndMiniblockSizes<int32_t>();
}
TEST(ParquetDeltaCoder, DifferentBlockAndMiniblockSizes64) {
DifferentBlockAndMiniblockSizes<int64_t>();
}
int ChooseRandomMiniblockSize(std::mt19937& gen, int block_size) {
/// Choose the valid miniblock sizes.
std::vector<int> valid_sizes;
for (int v = 32; v <= block_size; v += 32) {
if (block_size % v == 0) valid_sizes.push_back(v);
}
/// Inclusive range.
std::uniform_int_distribution<std::size_t> dist(0, valid_sizes.size() - 1);
const std::size_t index = dist(gen);
return valid_sizes[index];
}
/// Returns a uniform distribution within a range of logarithmically scaled width.
template <typename INT_T>
std::uniform_int_distribution<INT_T> GetValueDistribution(std::mt19937& gen) {
std::uniform_int_distribution<int> dist(0, sizeof(INT_T) - 3);
int scale = dist(gen);
INT_T lower = 1u << scale;
INT_T upper = 1u << (scale + 1);
std::bernoulli_distribution b(0.5);
const bool change_sign = b(gen);
if (change_sign) {
int32_t tmp = upper;
upper = -lower;
lower = -tmp;
}
return std::uniform_int_distribution<INT_T>(lower, upper);
}
template <typename INT_T>
void random_test(std::mt19937& gen, double p_change_range) {
std::uniform_int_distribution<std::size_t> block_size_dist(1, 4);
int block_size = 128 * block_size_dist(gen);
int miniblock_size = ChooseRandomMiniblockSize(gen, block_size);
std::uniform_int_distribution<std::size_t> num_values_dist(100, 1000);
const std::size_t num_values = num_values_dist(gen);
std::bernoulli_distribution change_range_dist(p_change_range);
std::uniform_int_distribution<INT_T> values_dist = GetValueDistribution<INT_T>(gen);
std::vector<INT_T> values(num_values);
for (std::size_t i = 0; i < num_values; i++) {
values[i] = values_dist(gen);
if (change_range_dist(gen)) {
values_dist = GetValueDistribution<INT_T>(gen);
}
}
const std::vector<uint8_t> encoded = DeltaCoderTest<INT_T>::encode_all(values,
block_size, miniblock_size);
const std::vector<uint8_t> decoded_bytes = DeltaCoderTest<INT_T>::decode_all(encoded,
sizeof(INT_T));
std::vector<INT_T> decoded(decoded_bytes.size() / sizeof(INT_T));
memcpy(decoded.data(), decoded_bytes.data(), decoded_bytes.size());
EXPECT_EQ(values, decoded);
}
template <typename INT_T>
void RandomTests() {
std::mt19937 gen;
RandTestUtil::SeedRng("PARQUET_DELTA_CODER_TEST_RANDOM_SEED", &gen);
for (int i = 0; i < 20; i++) {
random_test<INT_T>(gen, 0.05);
}
}
TEST(ParquetDeltaCoder, RandomTests32) {
RandomTests<int32_t>();
}
TEST(ParquetDeltaCoder, RandomTests64) {
RandomTests<int64_t>();
}
}