blob: e3a22ec3c69e24a460570c3a80b566e31bf9cf98 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <stdio.h>
#include <stdlib.h>
#include <zstd.h>
#include <iostream>
#include "gen-cpp/Descriptors_types.h"
#include "exec/read-write-util.h"
#include "runtime/mem-tracker.h"
#include "runtime/mem-pool.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
#include "util/decompress.h"
#include "util/compress.h"
#include "util/ubsan.h"
#include "common/names.h"
using std::mt19937;
using std::uniform_int_distribution;
namespace impala {
// Fixture for testing class Decompressor
class DecompressorTest : public ::testing::Test {
protected:
DecompressorTest() : mem_pool_(&mem_tracker_) {
uint8_t* ip = input_;
for (int i = 0; i < 1024; i++) {
for (uint8_t ch = 'a'; ch <= 'z'; ++ch) {
*ip++ = ch;
}
for (uint8_t ch = 'Z'; ch >= 'A'; --ch) {
*ip++ = ch;
}
}
// The input for the streaming tests is a larger buffer which contains input_
// at the beginning and end and is null otherwise.
memset(&input_streaming_, 0, sizeof(input_streaming_));
memcpy(&input_streaming_, &input_, sizeof(input_));
memcpy(&input_streaming_[sizeof(input_streaming_) - sizeof(input_)],
&input_, sizeof(input_));
}
~DecompressorTest() {
mem_pool_.FreeAll();
}
void RunTest(THdfsCompression::type format, int clevel = 0) {
scoped_ptr<Codec> compressor;
scoped_ptr<Codec> decompressor;
Codec::CodecInfo codec_info(format, clevel);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
// LZ4 & ZSTD are not implemented to work without an allocated output
if (format == THdfsCompression::LZ4 || format == THdfsCompression::ZSTD ||
format == THdfsCompression::LZ4_BLOCKED) {
CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
sizeof(input_), input_);
CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
0, NULL);
} else {
CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
// Test with odd-length input (to test the calculation of block-sizes in
// SnappyBlockCompressor)
CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_) - 1,
input_);
// Test with input length of 1024 (to test SnappyBlockCompressor with a single
// block)
CompressAndDecompress(compressor.get(), decompressor.get(), 1024, input_);
// Test with empty input
if (format != THdfsCompression::BZIP2) {
CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL);
} else {
// bzip does not allow NULL input
CompressAndDecompress(compressor.get(), decompressor.get(), 0, input_);
}
}
DecompressOverUnderSizedOutputBuffer(compressor.get(), decompressor.get(),
sizeof(input_), input_);
compressor->Close();
decompressor->Close();
}
void RunTestStreaming(THdfsCompression::type format) {
scoped_ptr<Codec> compressor;
scoped_ptr<Codec> decompressor;
Codec::CodecInfo codec_info(format);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
sizeof(input_streaming_), input_streaming_);
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
0, NULL);
CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
0, input_);
compressor->Close();
decompressor->Close();
}
void CompressAndDecompress(Codec* compressor, Codec* decompressor,
int64_t input_len, uint8_t* input) {
// Non-preallocated output buffers
uint8_t* compressed;
int64_t compressed_length;
EXPECT_OK(compressor->ProcessBlock(false, input_len,
input, &compressed_length, &compressed));
uint8_t* output;
int64_t output_len;
EXPECT_OK(decompressor->ProcessBlock(false, compressed_length,
compressed, &output_len, &output));
EXPECT_EQ(output_len, input_len);
EXPECT_EQ(Ubsan::MemCmp(input, output, input_len), 0);
// Preallocated output buffers
int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input);
// Don't redo compression if compressor doesn't support MaxOutputLen()
if (max_compressed_length != -1) {
EXPECT_GE(max_compressed_length, 0);
uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
compressed_length = max_compressed_length;
EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
&compressed));
}
output_len = decompressor->MaxOutputLen(compressed_length, compressed);
if (output_len == -1) output_len = input_len;
output = mem_pool_.Allocate(output_len);
EXPECT_OK(decompressor->ProcessBlock(true, compressed_length, compressed,
&output_len, &output));
EXPECT_EQ(output_len, input_len);
EXPECT_EQ(Ubsan::MemCmp(input, output, input_len), 0);
}
// Test the behavior when the decompressor is given too little / too much space.
// Verify that the decompressor returns an error when the space is not enough, gives
// the correct output size when the space is enough, and does not write beyond the
// output size it claims.
void DecompressOverUnderSizedOutputBuffer(Codec* compressor, Codec* decompressor,
int64_t input_len, uint8_t* input) {
uint8_t* compressed;
int64_t compressed_length;
bool compress_preallocated = false;
int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input);
if (max_compressed_length > 0) {
compressed = mem_pool_.Allocate(max_compressed_length);
compressed_length = max_compressed_length;
compress_preallocated = true;
}
EXPECT_OK(compressor->ProcessBlock(compress_preallocated, input_len,
input, &compressed_length, &compressed));
int64_t output_len = decompressor->MaxOutputLen(compressed_length, compressed);
if (output_len == -1) output_len = input_len;
uint8_t* output = mem_pool_.Allocate(output_len);
// Check that the decompressor respects the output_len by passing in an
// output len that is 4 bytes too small and verifying that those 4 bytes
// are not touched. The decompressor should return a non-ok status, as it
// does not have space to decompress the full output.
output_len = output_len - 4;
u_int32_t *canary = (u_int32_t *) &output[output_len];
*canary = 0x66aa77bb;
Status status = decompressor->ProcessBlock(true, compressed_length, compressed,
&output_len, &output);
EXPECT_EQ(*canary, 0x66aa77bb);
EXPECT_FALSE(status.ok());
EXPECT_EQ(output_len, 0);
// Check that the output length is the same as input when the decompressor is provided
// with abundant space.
output_len = input_len * 2;
output = mem_pool_.Allocate(output_len);
EXPECT_TRUE(decompressor->ProcessBlock(true, compressed_length, compressed,
&output_len, &output).ok());
EXPECT_EQ(output_len, input_len);
}
void Compress(Codec* compressor, int64_t input_len, uint8_t* input,
int64_t* output_len, uint8_t** output, bool output_preallocated) {
if (input == NULL && compressor->file_extension() == "bz2") {
// bzip does not allow NULL input
*output = NULL;
*output_len = 0;
return;
}
EXPECT_OK(compressor->ProcessBlock(output_preallocated, input_len,
input, output_len, output));
}
Status StreamingDecompress(Codec* decompressor, int64_t input_len, uint8_t* input,
int64_t uncompressed_len, uint8_t* uncompressed_input, bool expected_stream_end,
int64_t* bytes_decompressed = NULL) {
// Should take multiple calls to ProcessBlockStreaming() to decompress the buffer.
int64_t decompressed_len = 0;
int64_t compressed_bytes_remaining = input_len;
uint8_t* compressed_input = input;
bool stream_end;
do {
uint8_t* output = NULL;
int64_t output_len = 0;
int64_t compressed_bytes_read = 0;
RETURN_IF_ERROR(decompressor->ProcessBlockStreaming(compressed_bytes_remaining,
compressed_input, &compressed_bytes_read, &output_len, &output, &stream_end));
EXPECT_EQ(
Ubsan::MemCmp(uncompressed_input + decompressed_len, output, output_len), 0);
decompressed_len += output_len;
EXPECT_LE(decompressed_len, uncompressed_len);
compressed_input = compressed_input + compressed_bytes_read;
compressed_bytes_remaining -= compressed_bytes_read;
} while (compressed_bytes_remaining > 0);
EXPECT_EQ(0, compressed_bytes_remaining);
EXPECT_EQ(stream_end, expected_stream_end);
if (stream_end) {
EXPECT_EQ(decompressed_len, uncompressed_len);
}
if (bytes_decompressed != NULL) *bytes_decompressed = decompressed_len;
return Status::OK();
}
void CompressAndStreamingDecompress(Codec* compressor, Codec* decompressor,
int64_t input_len, uint8_t* input) {
uint8_t* compressed = NULL;
int64_t compressed_length = 0;
Compress(compressor, input_len, input, &compressed_length, &compressed, false);
// If compressed_len is 0, there is nothing to decompress so should not expect
// "stream_end == true" either.
// Note the gzip compressor will generate some compressed data even if input == NULL
// or input_len == 0.
EXPECT_OK(StreamingDecompress(decompressor, compressed_length, compressed, input_len,
input, compressed_length > 0));
}
// Only tests compressors and decompressors with allocated output.
void CompressAndDecompressNoOutputAllocated(Codec* compressor,
Codec* decompressor, int64_t input_len, uint8_t* input) {
// Preallocated output buffers for compressor
int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input);
ASSERT_GT(max_compressed_length, 0);
uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
int64_t compressed_length = max_compressed_length;
EXPECT_OK(compressor->ProcessBlock(true, input_len, input, &compressed_length,
&compressed));
int64_t output_len = decompressor->MaxOutputLen(compressed_length, compressed);
if (output_len == -1) output_len = input_len;
uint8_t* output = mem_pool_.Allocate(output_len);
EXPECT_OK(decompressor->ProcessBlock(true, compressed_length, compressed,
&output_len, &output));
EXPECT_EQ(output_len, input_len);
EXPECT_EQ(Ubsan::MemCmp(input, output, input_len), 0);
}
void RunTestMultiStreamDecompressing(THdfsCompression::type format) {
uint8_t* compressed = NULL;
uint8_t* uncompressed = NULL;
int64_t uncompressed_len = 0;
int64_t compressed_len = 0;
// Generate multistream test data
GenerateMultiStreamData(format, &uncompressed_len, &uncompressed,
&compressed_len, &compressed);
scoped_ptr<Codec> decompressor;
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
// Test case 1. normal streams.
EXPECT_OK(StreamingDecompress(decompressor.get(), compressed_len, compressed,
uncompressed_len, uncompressed, true));
// Test case 2. multistream that is truncated. We should get stream_end == false
// but with no error.
int truncated = rand() % 512 + 1;
int64_t bytes_decompressed = 0;
ASSERT_LE(truncated, compressed_len);
EXPECT_OK(StreamingDecompress(decompressor.get(), compressed_len - truncated,
compressed, uncompressed_len, uncompressed, false, &bytes_decompressed));
// Decompress the remaining.
EXPECT_OK(StreamingDecompress(decompressor.get(), truncated,
compressed + (compressed_len - truncated), uncompressed_len - bytes_decompressed,
uncompressed + bytes_decompressed, true));
// Test case 3. multistream with junk data at the end.
EXPECT_ERROR(StreamingDecompress(decompressor.get(), COMPRESSED_BUFFER_SIZE,
compressed, uncompressed_len, uncompressed, false),
TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED);
decompressor->Close();
}
// Try to simulate pbzip2 behavior. pbzip2 splits large input into smaller chunks
// and compresses them separately, then concatenate the compressed streams together.
// We generate ~16MB compressed data to make sure it's bigger than the decompressor's
// output buffer size(STREAM_OUT_BUF_SIZE). With the generated raw input data, we
// expect ~2:1 compression ratio so we need 4xSTREAM_OUT_BUF_SIZE input data in total.
void GenerateMultiStreamData(THdfsCompression::type format, int64_t* uncompressed_len,
uint8_t** uncompressed_data, int64_t* compressed_len, uint8_t** compressed_data) {
uint8_t raw_input[RAW_INPUT_SIZE + 1];
for (int i = 0; i < RAW_INPUT_SIZE; ++i) {
raw_input[i] = 'a' + rand() % 26;
}
raw_input[RAW_INPUT_SIZE] = 0;
// Repeatedly pick random-size input data(~1MB), compress it, then concatenate
// those small compressed streams into one big buffer. Also save random input
// into a single buffer to verify decompressor output.
*compressed_data = mem_pool_.Allocate(COMPRESSED_BUFFER_SIZE);
*uncompressed_data = mem_pool_.Allocate(UNCOMPRESSED_BUFFER_SIZE);
*uncompressed_len = 0;
*compressed_len = 0;
scoped_ptr<Codec> compressor;
Codec::CodecInfo codec_info(format);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
// Make sure we don't completely fill the buffer, leave at least RAW_INPUT_SIZE
// bytes free in compressed buffer for junk data testing (Test case 3).
while (*compressed_len < (COMPRESSED_BUFFER_SIZE - RAW_INPUT_SIZE)
&& *uncompressed_len < (UNCOMPRESSED_BUFFER_SIZE - RAW_INPUT_SIZE)) {
int len = RAW_INPUT_SIZE - (rand() % 1024);
uint8_t* compressed_stream = NULL;
int64_t compressed_length = 0;
EXPECT_OK(compressor->ProcessBlock(false, len, raw_input, &compressed_length,
&compressed_stream));
memcpy(*compressed_data + *compressed_len, compressed_stream, compressed_length);
memcpy(*uncompressed_data + *uncompressed_len, raw_input, len);
*uncompressed_len += len;
*compressed_len += compressed_length;
}
compressor->Close();
}
// Buffer to hold generated random data. Size doesn't matter, use 1MB for easy
// calculation.
static const int RAW_INPUT_SIZE = 1024 * 1024;
// Need 2x STREAM_OUT_BUF_SIZE compressed data to make sure it's bigger than the
// decompressor's output buffer size.
static const int COMPRESSED_BUFFER_SIZE = 2 * Codec::STREAM_OUT_BUF_SIZE;
// With the generated raw input data, we expect ~2:1 compression ratio so we need
// 4x COMPRESSED_BUFFER_SIZE input data in total.
static const int UNCOMPRESSED_BUFFER_SIZE = 2 * COMPRESSED_BUFFER_SIZE;
// Buffer to hold generated random data that contains repeated letter [a..z] and [A..Z]
// for compressor/decompressor testing.
uint8_t input_[2 * 26 * 1024];
// Buffer for testing ProcessBlockStreaming() which allocates STREAM_OUT_BUF_SIZE output
// buffer. This is 4x the size of the output buffers to ensure that the decompressed output
// requires several calls and doesn't need to be nicely aligned (the last call gets a
// small amount of data).
uint8_t input_streaming_[UNCOMPRESSED_BUFFER_SIZE];
MemTracker mem_tracker_;
MemPool mem_pool_;
};
TEST_F(DecompressorTest, Default) {
RunTest(THdfsCompression::DEFAULT);
}
TEST_F(DecompressorTest, Snappy) {
RunTest(THdfsCompression::SNAPPY);
}
TEST_F(DecompressorTest, LZ4) {
RunTest(THdfsCompression::LZ4);
}
TEST_F(DecompressorTest, Gzip) {
RunTest(THdfsCompression::GZIP);
RunTestStreaming(THdfsCompression::GZIP);
RunTestMultiStreamDecompressing(THdfsCompression::GZIP);
}
TEST_F(DecompressorTest, Deflate) {
RunTest(THdfsCompression::DEFLATE);
RunTestStreaming(THdfsCompression::DEFLATE);
RunTestMultiStreamDecompressing(THdfsCompression::DEFLATE);
}
TEST_F(DecompressorTest, Bzip) {
RunTest(THdfsCompression::BZIP2);
RunTestStreaming(THdfsCompression::BZIP2);
RunTestMultiStreamDecompressing(THdfsCompression::BZIP2);
}
TEST_F(DecompressorTest, SnappyBlocked) {
RunTest(THdfsCompression::SNAPPY_BLOCKED);
}
TEST_F(DecompressorTest, Impala1506) {
// Regression test for IMPALA-1506
MemTracker trax;
MemPool pool(&trax);
scoped_ptr<Codec> compressor;
Codec::CodecInfo codec_info(impala::THdfsCompression::GZIP);
EXPECT_OK(Codec::CreateCompressor(&pool, true, codec_info, &compressor));
int64_t input_len = 3;
const uint8_t input[3] = {1, 2, 3};
int64_t output_len = -1;
uint8_t* output = NULL;
// call twice because the compressor will reallocate the first time
EXPECT_OK(compressor->ProcessBlock(false, input_len, input, &output_len, &output));
EXPECT_GE(output_len, 0);
output_len = -1;
EXPECT_OK(compressor->ProcessBlock(false, input_len, input, &output_len, &output));
EXPECT_GE(output_len, 0);
pool.FreeAll();
}
TEST_F(DecompressorTest, Impala5250) {
// Regression test for IMPALA-5250. It tests that SnappyDecompressor handles an input
// buffer with a zero byte correctly. It should set the output_length to 0.
MemTracker trax;
MemPool pool(&trax);
scoped_ptr<Codec> decompressor;
EXPECT_OK(Codec::CreateDecompressor(&pool, true, impala::THdfsCompression::SNAPPY,
&decompressor));
uint8_t buf[1]{0};
uint8_t out_buf[1];
int64_t output_length = 1;
uint8_t* output = out_buf;
EXPECT_OK(decompressor->ProcessBlock(true, 1, buf, &output_length, &output));
EXPECT_EQ(output_length, 0);
}
TEST_F(DecompressorTest, LZ4Huge) {
// IMPALA-5987: When Lz4Compressor::MaxOutputLen() returns 0,
// it means that the input is too large to compress, therefore trying
// to compress it should fail.
// Generate a big random payload.
int payload_len = numeric_limits<int>::max();
unique_ptr<uint8_t[]> payload(new uint8_t[payload_len]);
for (int i = 0 ; i < payload_len; ++i) payload[i] = rand();
scoped_ptr<Codec> compressor;
Codec::CodecInfo codec_info(impala::THdfsCompression::LZ4);
EXPECT_OK(Codec::CreateCompressor(nullptr, true, codec_info, &compressor));
// The returned max_size is 0 because the payload is too big.
int64_t max_size = compressor->MaxOutputLen(payload_len);
ASSERT_EQ(max_size, 0);
// Trying to compress it should give an error
int64_t compressed_len = max_size;
unique_ptr<uint8_t[]> compressed(new uint8_t[max_size]);
uint8_t* compressed_ptr = compressed.get();
EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload.get(),
&compressed_len, &compressed_ptr), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
}
TEST_F(DecompressorTest, ZSTD) {
RunTest(THdfsCompression::ZSTD, ZSTD_CLEVEL_DEFAULT);
mt19937 rng;
RandTestUtil::SeedRng("ZSTD_COMPRESSION_LEVEL_SEED", &rng);
// zstd supports compression levels from 1 up to ZSTD_maxCLevel()
const int clevel = uniform_int_distribution<int>(1, ZSTD_maxCLevel())(rng);
RunTest(THdfsCompression::ZSTD, clevel);
}
TEST_F(DecompressorTest, LZ4HadoopCompat) {
scoped_ptr<Codec> compressor;
scoped_ptr<Codec> decompressor;
THdfsCompression::type format = THdfsCompression::LZ4_BLOCKED;
Codec::CodecInfo codec_info(format);
EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
// Hadoop uses a block compression scheme on top of lz4. The input stream could be
// split into blocks and each block contains the uncompressed length for the block
// followed by one of more length-prefixed blocks of compressed data.
// Block layout:
// <4 byte big endian uncompressed_size><inner block 1>...<inner block N>
// inner block <i> layout: <4-byte big endian compressed_size><lz4 compressed block>
// For this unit test we assume an inner block size of 8K.
const int64_t input_len = sizeof(input_);
const int64_t block_len = 8192;
const int64_t num_blocks = ceil((double)input_len/(double)block_len);
const int64_t max_compressed_block_length = compressor->MaxOutputLen(block_len, NULL);
const int64_t total_compressed_buffer_length =
max_compressed_block_length * num_blocks;
uint8_t* const compressed_buffer = mem_pool_.Allocate(total_compressed_buffer_length);
// Write the total uncompressed_size
ReadWriteUtil::PutInt(compressed_buffer, static_cast<uint32_t>(input_len));
int64_t compressed_buffer_index = sizeof(uint32_t);
// Preallocate output buffers for compressor
uint8_t* const compressed_block = mem_pool_.Allocate(max_compressed_block_length);
// Generate Hadoop's compressed block layout
const int num_8K_inner_blocks = input_len / block_len;
for (int i=0; i < (num_8K_inner_blocks * block_len); i += block_len) {
uint8_t* input = &input_[i];
uint8_t* compressed = compressed_block;
int64_t compressed_length = max_compressed_block_length;
EXPECT_OK(compressor->ProcessBlock(true, block_len, input, &compressed_length,
&compressed));
compressed += sizeof(uint32_t);
compressed_length -= sizeof(uint32_t);
memcpy(compressed_buffer + compressed_buffer_index, compressed, compressed_length);
compressed_buffer_index += compressed_length;
}
// Compress the last block (not a multiple of 8K bytes), if any
if (input_len % block_len != 0) {
const int64_t last_block_sz = input_len % block_len;
const uint64_t last_block_index = input_len - last_block_sz;
uint8_t* input = &input_[last_block_index];
uint8_t* compressed = compressed_block;
int64_t compressed_length = max_compressed_block_length;
EXPECT_OK(compressor->ProcessBlock(true, last_block_sz, input, &compressed_length,
&compressed));
compressed += sizeof(uint32_t);
compressed_length -= sizeof(uint32_t);
memcpy(compressed_buffer + compressed_buffer_index, compressed, compressed_length);
compressed_buffer_index += compressed_length;
}
DCHECK_LE(compressed_buffer_index, total_compressed_buffer_length);
// Now that we have a compressed block using Hadoop's lz4 block compression scheme,
// let's decompress it using Impala's LZ4BlockDecompressor.
int64_t output_len = input_len;
uint8_t* output = mem_pool_.Allocate(output_len);
EXPECT_OK(decompressor->ProcessBlock(true, compressed_buffer_index, compressed_buffer,
&output_len, &output));
EXPECT_EQ(output_len, input_len);
EXPECT_EQ(memcmp(input_, output, input_len), 0);
}
TEST_F(DecompressorTest, LZ4Blocked) {
RunTest(THdfsCompression::LZ4_BLOCKED);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
int rand_seed = time(NULL);
LOG(INFO) << "rand_seed: " << rand_seed;
srand(rand_seed);
return RUN_ALL_TESTS();
}