blob: cb269068b36b239eb66d793ff201258877f991b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "unit/Catch.h"
#include "unit/TestBase.h"
#include "sitetosite/CompressionOutputStream.h"
#include "sitetosite/CompressionInputStream.h"
#include "io/BufferStream.h"
#include "io/ZlibStream.h"
namespace org::apache::nifi::minifi::test {
void verifySyncBytes(io::BufferStream& buffer_stream) {
std::vector<std::byte> data_buffer;
data_buffer.resize(4);
buffer_stream.read(std::span(data_buffer));
REQUIRE(std::to_integer<char>(data_buffer[0]) == sitetosite::SYNC_BYTES[0]);
REQUIRE(std::to_integer<char>(data_buffer[1]) == sitetosite::SYNC_BYTES[1]);
REQUIRE(std::to_integer<char>(data_buffer[2]) == sitetosite::SYNC_BYTES[2]);
REQUIRE(std::to_integer<char>(data_buffer[3]) == sitetosite::SYNC_BYTES[3]);
}
void verifyOriginalSize(io::BufferStream& buffer_stream, uint32_t expected_size) {
uint32_t original_size = 0;
buffer_stream.read(original_size);
REQUIRE(original_size == expected_size);
}
void verifyCompressedData(io::BufferStream& compressed_buffer_stream, uint32_t expected_size) {
uint32_t compressed_size = 0;
compressed_buffer_stream.read(compressed_size);
std::vector<std::byte> compressed_data_buffer;
compressed_data_buffer.resize(compressed_size);
compressed_buffer_stream.read(std::span(compressed_data_buffer));
io::BufferStream decompressed_data_stream;
io::ZlibDecompressStream decompressor(gsl::make_not_null(&decompressed_data_stream), io::ZlibCompressionFormat::ZLIB);
decompressor.write(compressed_data_buffer);
REQUIRE(decompressor.isFinished());
std::vector<std::byte> decompressed_data_buffer;
decompressed_data_buffer.resize(expected_size);
decompressed_data_stream.read(std::span(decompressed_data_buffer));
for (size_t i = 0; i < decompressed_data_buffer.size(); i += 4) {
uint32_t value = (static_cast<uint32_t>(std::to_integer<uint8_t>(decompressed_data_buffer[i])) << 24)
| (static_cast<uint32_t>(std::to_integer<uint8_t>(decompressed_data_buffer[i + 1])) << 16)
| (static_cast<uint32_t>(std::to_integer<uint8_t>(decompressed_data_buffer[i + 2])) << 8)
| static_cast<uint32_t>(std::to_integer<uint8_t>(decompressed_data_buffer[i + 3]));
REQUIRE(value == 42);
}
}
void verifyContinueByte(io::BufferStream& buffer_stream) {
uint8_t closing_byte = 2;
buffer_stream.read(closing_byte);
REQUIRE(closing_byte == 1);
}
void verifyClosingByte(io::BufferStream& buffer_stream) {
uint8_t closing_byte = 2;
buffer_stream.read(closing_byte);
REQUIRE(closing_byte == 0);
}
void verifyCompressedChunks(io::BufferStream& buffer_stream, uint32_t expected_size) {
bool first_chunk = true;
uint32_t size_processed = 0;
while (size_processed < expected_size) {
uint32_t current_size_to_read = 0;
if (expected_size - size_processed > sitetosite::COMPRESSION_BUFFER_SIZE) {
current_size_to_read = sitetosite::COMPRESSION_BUFFER_SIZE;
} else {
current_size_to_read = expected_size - size_processed;
}
if (first_chunk) {
first_chunk = false;
} else {
verifyContinueByte(buffer_stream);
}
verifySyncBytes(buffer_stream);
verifyOriginalSize(buffer_stream, current_size_to_read);
verifyCompressedData(buffer_stream, current_size_to_read);
size_processed += current_size_to_read;
}
verifyClosingByte(buffer_stream);
}
TEST_CASE("Write empty output stream", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
output_stream.close();
REQUIRE(buffer_stream.size() == 0);
}
TEST_CASE("Write a 4 byte integer and flush", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
output_stream.flush();
verifyCompressedChunks(buffer_stream, 4);
}
TEST_CASE("Write a single chunk of compressed data and flush on close", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
REQUIRE(buffer_stream.size() == 0);
output_stream.close();
REQUIRE(buffer_stream.size() > 0);
verifyCompressedChunks(buffer_stream, 40000);
}
TEST_CASE("Write 2 chunks of compressed data and flush on demand", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
REQUIRE(buffer_stream.size() == 0);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
// Automatically compress data when buffer is full
REQUIRE(buffer_stream.size() > 0);
output_stream.close();
verifyCompressedChunks(buffer_stream, 80000);
}
TEST_CASE("Write 3 chunks of compressed data and flush on demand", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
REQUIRE(buffer_stream.size() == 0);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
// Automatically compress data when buffer is full
REQUIRE(buffer_stream.size() > 0);
for (size_t i = 0; i < 20000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
output_stream.close();
verifyCompressedChunks(buffer_stream, 160000);
}
TEST_CASE("Read single 4 byte integer compressed", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
output_stream.flush();
sitetosite::CompressionInputStream input_stream(buffer_stream);
uint32_t read_byte{};
CHECK(input_stream.read(read_byte) == 4);
CHECK(read_byte == 42);
}
TEST_CASE("Read large number of bytes compressed", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
for (size_t i = 0; i < 10000; ++i) {
CHECK(output_stream.write(static_cast<uint32_t>(42)) == 4);
}
output_stream.flush();
sitetosite::CompressionInputStream input_stream(buffer_stream);
for (size_t i = 0; i < 10000; ++i) {
uint32_t read_byte{};
CHECK(input_stream.read(read_byte) == 4);
CHECK(read_byte == 42);
}
}
TEST_CASE("Read large number of bytes that uses multiple buffers", "[CompressionOutputStream]") {
io::BufferStream buffer_stream;
sitetosite::CompressionOutputStream output_stream(buffer_stream);
uint32_t count = 0;
while (buffer_stream.size() + 100 < sitetosite::COMPRESSION_BUFFER_SIZE) {
++count;
CHECK(output_stream.write(count) == 4);
}
output_stream.flush();
sitetosite::CompressionInputStream input_stream(buffer_stream);
for (size_t i = 1; i <= count; ++i) {
uint32_t read_byte{};
CHECK(input_stream.read(read_byte) == 4);
CHECK(read_byte == i);
}
}
} // namespace org::apache::nifi::minifi::test