blob: 182a83984e531c2bf09ab449150c04966828ceae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ColumnWriter.hh"
#include "Compression.hh"
#include "MemoryOutputStream.hh"
#include "RLEv1.hh"
#include "wrap/orc-proto-wrapper.hh"
#include "wrap/gtest-wrapper.h"
#include <algorithm>
namespace orc {
const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024 * 2; // 2M
void generateRandomData(
char * data,
size_t size,
bool letter) {
for (size_t i = 0; i < size; ++i) {
if (letter) {
bool capitalized = std::rand() % 2 == 0;
data[i] = capitalized ? static_cast<char>('A' + std::rand() % 26)
: static_cast<char>('a' + std::rand() % 26);
} else {
data[i] = static_cast<char>(std::rand() % 256);
}
}
}
void decompressAndVerify(const MemoryOutputStream& memStream,
CompressionKind kind,
const char * data,
size_t size,
MemoryPool& pool) {
std::unique_ptr<SeekableInputStream> inputStream(
new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
std::unique_ptr<SeekableInputStream> decompressStream =
createDecompressor(kind, std::move(inputStream), 1024, pool);
const char * decompressedBuffer;
int decompressedSize;
int pos = 0;
while (decompressStream->Next(
reinterpret_cast<const void**>(&decompressedBuffer),
&decompressedSize)) {
for (int i = 0; i < decompressedSize; ++i) {
EXPECT_LT(static_cast<size_t>(pos), size);
EXPECT_EQ(data[pos], decompressedBuffer[i]);
++pos;
}
}
}
void compressAndVerify(CompressionKind kind,
OutputStream * outStream,
CompressionStrategy strategy,
uint64_t capacity,
uint64_t block,
MemoryPool& pool,
const char * data,
size_t dataSize) {
std::unique_ptr<BufferedOutputStream> compressStream =
createCompressor(kind,
outStream,
strategy,
capacity,
block,
pool);
size_t pos = 0;
char * compressBuffer;
int compressBufferSize = 0;
while (dataSize > 0 &&
compressStream->Next(
reinterpret_cast<void**>(&compressBuffer),
&compressBufferSize)) {
size_t copy_size = std::min(
static_cast<size_t>(compressBufferSize),
dataSize);
memcpy(compressBuffer, data + pos, copy_size);
if (copy_size == dataSize) {
compressStream->BackUp(compressBufferSize - static_cast<int>(dataSize));
}
pos += copy_size;
dataSize -= copy_size;
}
EXPECT_EQ(0, dataSize);
compressStream->flush();
}
void compress_original_string(orc::CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 1024;
uint64_t block = 128;
// simple, short string which will result in the original being saved
char testData [] = "hello world!";
compressAndVerify(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool,
testData,
sizeof(testData));
decompressAndVerify(memStream,
kind,
testData,
sizeof(testData),
*pool);
}
TEST(TestCompression, zlib_compress_original_string) {
compress_original_string(CompressionKind_ZLIB);
}
void compress_simple_repeated_string(orc::CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 1024;
uint64_t block = 128;
// simple repeated string (50 'a's) which should be compressed
char testData [] = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
compressAndVerify(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool,
testData,
sizeof(testData));
decompressAndVerify(memStream,
kind,
testData,
sizeof(testData),
*pool);
}
TEST(TestCompression, compress_simple_repeated_string) {
compress_simple_repeated_string(CompressionKind_ZLIB);
}
void compress_two_blocks(orc::CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 1024;
uint64_t block = 128;
// testData will be compressed in two blocks
char testData [170];
for (int i = 0; i < 170; ++i) {
testData[i] = 'a';
}
compressAndVerify(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool,
testData,
170);
decompressAndVerify(memStream,
kind,
testData,
170,
*pool);
}
TEST(TestCompression, zlib_compress_two_blocks) {
compress_two_blocks(CompressionKind_ZLIB);
}
void compress_random_letters(orc::CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 4096;
uint64_t block = 1024;
size_t dataSize = 1024 * 1024; // 1M
// testData will be compressed in two blocks
char * testData = new char [dataSize];
generateRandomData(testData, dataSize, true);
compressAndVerify(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool,
testData,
dataSize);
decompressAndVerify(memStream,
kind,
testData,
dataSize,
*pool);
delete [] testData;
}
TEST(TestCompression, zlib_compress_random_letters) {
compress_random_letters(CompressionKind_ZLIB);
}
void compress_random_bytes(orc::CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 4096;
uint64_t block = 1024;
size_t dataSize = 1024 * 1024; // 1M
// testData will be compressed in two blocks
char * testData = new char [dataSize];
generateRandomData(testData, dataSize, false);
compressAndVerify(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool,
testData,
dataSize);
decompressAndVerify(memStream,
kind,
testData,
dataSize,
*pool);
delete [] testData;
}
TEST(TestCompression, zlib_compress_random_bytes) {
compress_random_bytes(CompressionKind_ZLIB);
}
void protobuff_compression(orc::CompressionKind kind,
proto::CompressionKind protoKind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
uint64_t capacity = 4096;
uint64_t block = 256;
proto::PostScript ps;
ps.set_footerlength(197934);
ps.set_compression(protoKind);
ps.set_metadatalength(100);
ps.set_writerversion(789);
ps.set_magic("protobuff_serialization");
for (uint32_t i = 0; i < 1024; ++i) {
ps.add_version(static_cast<uint32_t>(std::rand()));
}
std::unique_ptr<BufferedOutputStream> compressStream =
createCompressor(kind,
&memStream,
CompressionStrategy_SPEED,
capacity,
block,
*pool);
EXPECT_TRUE(ps.SerializeToZeroCopyStream(compressStream.get()));
compressStream->flush();
std::unique_ptr<SeekableInputStream> inputStream(
new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
std::unique_ptr<SeekableInputStream> decompressStream =
createDecompressor(kind,
std::move(inputStream),
1024,
*pool);
proto::PostScript ps2;
ps2.ParseFromZeroCopyStream(decompressStream.get());
EXPECT_EQ(ps.footerlength(), ps2.footerlength());
EXPECT_EQ(ps.compression(), ps2.compression());
EXPECT_EQ(ps.metadatalength(), ps2.metadatalength());
EXPECT_EQ(ps.writerversion(), ps2.writerversion());
EXPECT_EQ(ps.magic(), ps2.magic());
for (int i = 0; i < 1024; ++i) {
EXPECT_EQ(ps.version(i), ps2.version(i));
}
}
TEST(TestCompression, zlib_protobuff_compression) {
protobuff_compression(CompressionKind_ZLIB, proto::ZLIB);
}
TEST(Compression, zstd_compress_original_string) {
compress_original_string(CompressionKind_ZSTD);
}
TEST(Compression, zstd_compress_simple_repeated_string) {
compress_simple_repeated_string(CompressionKind_ZSTD);
}
TEST(Compression, zstd_compress_two_blocks) {
compress_two_blocks(CompressionKind_ZSTD);
}
TEST(Compression, zstd_compress_random_letters) {
compress_random_letters(CompressionKind_ZSTD);
}
TEST(Compression, zstd_compress_random_bytes) {
compress_random_bytes(CompressionKind_ZSTD);
}
TEST(Compression, zstd_protobuff_compression) {
protobuff_compression(CompressionKind_ZSTD, proto::ZSTD);
}
void testSeekDecompressionStream(CompressionKind kind) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool * pool = getDefaultPool();
CompressionStrategy strategy = CompressionStrategy_COMPRESSION;
uint64_t batchSize = 1024, blockSize = 256;
AppendOnlyBufferedStream outStream(createCompressor(
kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool));
// write 3 batches of data and record positions between every batch
size_t row = 0;
proto::RowIndexEntry rowIndexEntry1, rowIndexEntry2;
RowIndexPositionRecorder recorder1(rowIndexEntry1), recorder2(rowIndexEntry2);
for (size_t repeat = 0; repeat != 3; ++repeat) {
for (size_t i = 0; i != batchSize; ++i) {
std::string data = to_string(static_cast<int64_t>(row++));
outStream.write(data.c_str(), data.size());
}
if (repeat == 0) {
outStream.recordPosition(&recorder1);
} else if (repeat == 1) {
outStream.recordPosition(&recorder2);
}
}
outStream.flush();
// try to decompress them
std::unique_ptr<SeekableInputStream> inputStream(
new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
std::unique_ptr<SeekableInputStream> decompressStream =
createDecompressor(kind,
std::move(inputStream),
blockSize,
*pool);
// prepare positions to seek to
EXPECT_EQ(rowIndexEntry1.positions_size(), rowIndexEntry2.positions_size());
std::list<uint64_t> pos1, pos2;
for (int i = 0; i < rowIndexEntry1.positions_size(); ++i) {
pos1.push_back(rowIndexEntry1.positions(i));
pos2.push_back(rowIndexEntry2.positions(i));
}
PositionProvider provider1(pos1), provider2(pos2);
const void* data;
int size;
// seek to positions between first two batches
decompressStream->seek(provider1);
decompressStream->Next(&data, &size);
std::string data1(static_cast<const char*>(data), 4);
std::string expected1 = "1024";
EXPECT_EQ(expected1, data1);
// seek to positions between last two batches
decompressStream->seek(provider2);
decompressStream->Next(&data, &size);
std::string data2(static_cast<const char*>(data), 4);
std::string expected2 = "2048";
EXPECT_EQ(expected2, data2);
}
TEST(Compression, seekDecompressionStream) {
testSeekDecompressionStream(CompressionKind_ZSTD);
testSeekDecompressionStream(CompressionKind_ZLIB);
}
}