blob: 5bc32f1c47a436b6ebe3f157923f3155aeb515de [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sitetosite/CompressionInputStream.h"
#include <algorithm>
#include "io/ZlibStream.h"
namespace org::apache::nifi::minifi::sitetosite {
size_t CompressionInputStream::decompressData() {
if (eof_) {
return 0;
}
std::vector<std::byte> local_buffer(COMPRESSION_BUFFER_SIZE);
auto ret = internal_stream_.read(std::span(local_buffer).subspan(0, SYNC_BYTES.size()));
if (ret != SYNC_BYTES.size() ||
!std::equal(SYNC_BYTES.begin(), SYNC_BYTES.end(), local_buffer.begin(), [](char sync_char, std::byte read_byte) { return static_cast<std::byte>(sync_char) == read_byte;})) {
logger_->log_error("Failed to read sync bytes or sync bytes do not match");
return io::STREAM_ERROR;
}
uint32_t original_size = 0;
ret = internal_stream_.read(original_size);
if (io::isError(ret) || ret != 4) {
logger_->log_error("Failed to read original size, ret: {}", ret);
return io::STREAM_ERROR;
}
uint32_t compressed_size = 0;
ret = internal_stream_.read(compressed_size);
if (io::isError(ret) || ret != 4) {
logger_->log_error("Failed to read compressed size, ret: {}", ret);
return io::STREAM_ERROR;
}
if (compressed_size == 0 && original_size != 0) {
logger_->log_error("Compressed size is 0 but original size is not");
return io::STREAM_ERROR;
}
if (compressed_size > COMPRESSION_BUFFER_SIZE) {
logger_->log_error("Compressed size exceeds buffer size");
return io::STREAM_ERROR;
}
if (original_size > COMPRESSION_BUFFER_SIZE) {
logger_->log_error("Original size exceeds buffer size");
return io::STREAM_ERROR;
}
ret = internal_stream_.read(std::span(local_buffer).subspan(0, compressed_size));
if (io::isError(ret) || ret != compressed_size) {
logger_->log_error("Failed to read compressed data, ret: {}", ret);
return io::STREAM_ERROR;
}
if (compressed_size != 0) {
io::BufferStream decompressed_data_stream;
io::ZlibDecompressStream zlib_stream{gsl::make_not_null(&decompressed_data_stream), io::ZlibCompressionFormat::ZLIB};
ret = zlib_stream.write(std::span(local_buffer).subspan(0, compressed_size));
if (io::isError(ret)) {
logger_->log_error("Failed to write compressed data to zlib stream, ret: {}", ret);
return ret;
}
zlib_stream.close();
gsl_Assert(zlib_stream.isFinished());
ret = decompressed_data_stream.read(std::span(buffer_).subspan(0, original_size));
if (io::isError(ret) || ret != original_size) {
logger_->log_error("Failed to read decompressed data, ret: {}", ret);
return io::STREAM_ERROR;
}
}
uint8_t end_byte = 0;
ret = internal_stream_.read(end_byte);
if (io::isError(ret) || ret != 1) {
logger_->log_error("Failed to read end byte, ret: {}", ret);
return io::STREAM_ERROR;
}
// If end_byte is 0, it indicates EOF, if it is 1, it indicates more data will follow
if (end_byte == 0) {
eof_ = true;
} else if (end_byte != 1) {
logger_->log_error("End byte is not 0 or 1, received: {}", end_byte);
return io::STREAM_ERROR;
}
buffered_data_length_ = original_size;
buffer_offset_ = 0;
return original_size;
}
size_t CompressionInputStream::read(std::span<std::byte> out_buffer) {
if (eof_ && buffered_data_length_ == 0) {
return 0;
}
std::span<std::byte> remaining_output = out_buffer;
size_t total_bytes_read = 0;
while (!remaining_output.empty()) {
if (buffered_data_length_ == 0 || buffered_data_length_ == buffer_offset_) {
auto ret = decompressData();
if (io::isError(ret)) {
return io::STREAM_ERROR;
}
}
const auto bytes_available = buffered_data_length_ - buffer_offset_;
if (bytes_available == 0) {
break;
}
const auto bytes_to_copy = std::min(bytes_available, remaining_output.size());
const auto source_data = std::span<const std::byte>(buffer_).subspan(buffer_offset_, bytes_to_copy);
std::ranges::copy(source_data, remaining_output.begin());
buffer_offset_ += bytes_to_copy;
total_bytes_read += bytes_to_copy;
remaining_output = remaining_output.subspan(bytes_to_copy);
if (buffer_offset_ == buffered_data_length_) {
buffer_offset_ = 0;
buffered_data_length_ = 0;
}
}
return total_bytes_read;
}
void CompressionInputStream::close() {
internal_stream_.close();
}
} // namespace org::apache::nifi::minifi::sitetosite