blob: 31e1e4388a013f0fdac14b079ccf17b7123b443b [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sitetosite/CompressionOutputStream.h"
#include <algorithm>
#include "io/ZlibStream.h"
#include "io/StreamPipe.h"
#include "io/BufferStream.h"
#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::sitetosite {
size_t CompressionOutputStream::write(const uint8_t *value, size_t len) {
if (value == nullptr || len == 0) {
return 0;
}
const std::span<const std::byte> input_data{reinterpret_cast<const std::byte*>(value), len};
auto remaining_data = input_data;
size_t total_bytes_written = 0;
while (!remaining_data.empty()) {
const auto free_spaces_left_in_buffer = buffer_.size() - buffer_offset_;
const auto bytes_to_write = std::min(remaining_data.size(), free_spaces_left_in_buffer);
const auto chunk_to_write = remaining_data.subspan(0, bytes_to_write);
const auto buffer_destination = std::span<std::byte>(buffer_).subspan(buffer_offset_, bytes_to_write);
std::ranges::copy(chunk_to_write, buffer_destination.begin());
total_bytes_written += bytes_to_write;
remaining_data = remaining_data.subspan(bytes_to_write);
buffer_offset_ += bytes_to_write;
gsl_Assert(buffer_offset_ <= buffer_.size());
if (buffer_offset_ == buffer_.size()) {
auto ret = compressAndWrite();
if (io::isError(ret)) {
return ret;
}
}
}
return total_bytes_written;
}
size_t CompressionOutputStream::compressAndWrite() {
if (was_data_written_) {
// Write a continue byte to indicate that there is more data to follow
auto ret = internal_stream_.write(static_cast<uint8_t>(1));
if (io::isError(ret)) {
logger_->log_error("Failed to write continue byte before compression: {}", ret);
return ret;
}
}
auto ret = internal_stream_.write(reinterpret_cast<const uint8_t *>(SYNC_BYTES.data()), SYNC_BYTES.size());
if (io::isError(ret)) {
logger_->log_error("Failed to write sync bytes before compression: {}", ret);
return ret;
}
io::BufferStream buffer_stream;
{
io::ZlibCompressStream zlib_stream{gsl::make_not_null(&buffer_stream), io::ZlibCompressionFormat::ZLIB, Z_BEST_SPEED};
ret = zlib_stream.write(gsl::make_span(buffer_).subspan(0, buffer_offset_));
if (io::isError(ret)) {
logger_->log_error("Failed to write data to zlib stream: {}", ret);
return ret;
}
gsl_Assert(buffer_offset_ == ret);
zlib_stream.close();
gsl_Assert(zlib_stream.isFinished());
}
// Write the original size of the data before compression
ret = internal_stream_.write(gsl::narrow<uint32_t>(buffer_offset_));
if (io::isError(ret)) {
logger_->log_error("Failed to write original size before compression: {}", ret);
return ret;
}
// Write the compressed size of the data
ret = internal_stream_.write(gsl::narrow<uint32_t>(buffer_stream.size()));
if (io::isError(ret)) {
return ret;
}
// Write the compressed data
ret = internal::pipe(buffer_stream, internal_stream_);
if (io::isError(ret)) {
return ret;
}
buffer_offset_ = 0;
was_data_written_ = true;
return ret;
}
void CompressionOutputStream::flush() {
if (buffer_offset_ > 0) {
auto ret = compressAndWrite();
if (io::isError(ret)) {
logger_->log_error("Flush failed when compressing data: {}", ret);
return;
}
}
if (was_data_written_) {
was_data_written_ = false;
auto ret = internal_stream_.write(static_cast<uint8_t>(0));
if (io::isError(ret)) {
logger_->log_error("Flush failed when writing on internal stream: {}", ret);
return;
}
}
}
void CompressionOutputStream::close() {
flush();
internal_stream_.close();
}
} // namespace org::apache::nifi::minifi::sitetosite