| /** |
| * @file CompressContent.h |
| * CompressContent class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef __COMPRESS_CONTENT_H__ |
| #define __COMPRESS_CONTENT_H__ |
| |
| #include <cinttypes> |
| |
| #include "archive_entry.h" |
| #include "archive.h" |
| |
| #include "FlowFileRecord.h" |
| #include "core/Processor.h" |
| #include "core/ProcessSession.h" |
| #include "core/Core.h" |
| #include "core/Resource.h" |
| #include "core/Property.h" |
| #include "core/logging/LoggerConfiguration.h" |
| #include "io/ZlibStream.h" |
| #include "utils/Enum.h" |
| #include "utils/gsl.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| |
| // CompressContent Class |
| class CompressContent: public core::Processor { |
| public: |
| // Constructor |
| /*! |
| * Create a new processor |
| */ |
| explicit CompressContent(std::string name, utils::Identifier uuid = utils::Identifier()) |
| : core::Processor(name, uuid) |
| , logger_(logging::LoggerFactory<CompressContent>::getLogger()) |
| , updateFileName_(false) |
| , encapsulateInTar_(false) { |
| } |
| // Destructor |
| virtual ~CompressContent() = default; |
| // Processor Name |
| static constexpr char const* ProcessorName = "CompressContent"; |
| // Supported Properties |
| static core::Property CompressMode; |
| static core::Property CompressLevel; |
| static core::Property CompressFormat; |
| static core::Property UpdateFileName; |
| static core::Property EncapsulateInTar; |
| static core::Property BatchSize; |
| |
| // Supported Relationships |
| static core::Relationship Failure; |
| static core::Relationship Success; |
| |
| SMART_ENUM(CompressionMode, |
| (Compress, "compress"), |
| (Decompress, "decompress") |
| ) |
| |
| SMART_ENUM(CompressionFormat, |
| (GZIP, "gzip"), |
| (LZMA, "lzma"), |
| (XZ_LZMA2, "xz-lzma2"), |
| (BZIP2, "bzip2") |
| ) |
| |
| SMART_ENUM_EXTEND(ExtendedCompressionFormat, CompressionFormat, (GZIP, LZMA, XZ_LZMA2, BZIP2), |
| (USE_MIME_TYPE, "use mime.type attribute") |
| ) |
| |
| public: |
| // Nest Callback Class for read stream from flow for compress |
| class ReadCallbackCompress: public InputStreamCallback { |
| public: |
| ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) : |
| flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) { |
| } |
| ~ReadCallbackCompress() = default; |
| int64_t process(const std::shared_ptr<io::BaseStream>& stream) { |
| uint8_t buffer[4096U]; |
| int64_t ret = 0; |
| uint64_t read_size = 0; |
| |
| ret = archive_write_header(arch_, entry_); |
| if (ret != ARCHIVE_OK) { |
| logger_->log_error("Compress Content archive error %s", archive_error_string(arch_)); |
| status_ = -1; |
| return -1; |
| } |
| while (read_size < flow_->getSize()) { |
| ret = stream->read(buffer, sizeof(buffer)); |
| if (ret < 0) { |
| status_ = -1; |
| return -1; |
| } |
| if (ret > 0) { |
| ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret)); |
| if (ret < 0) { |
| logger_->log_error("Compress Content archive error %s", archive_error_string(arch_)); |
| status_ = -1; |
| return -1; |
| } |
| read_size += ret; |
| } else { |
| break; |
| } |
| } |
| return read_size; |
| } |
| std::shared_ptr<core::FlowFile> flow_; |
| struct archive *arch_; |
| struct archive_entry *entry_; |
| int status_; |
| std::shared_ptr<logging::Logger> logger_; |
| }; |
| // Nest Callback Class for read stream from flow for decompress |
| class ReadCallbackDecompress: public InputStreamCallback { |
| public: |
| ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) : |
| read_size_(0), offset_(0), flow_(flow) { |
| origin_offset_ = flow_->getOffset(); |
| } |
| ~ReadCallbackDecompress() = default; |
| int64_t process(const std::shared_ptr<io::BaseStream>& stream) { |
| read_size_ = 0; |
| stream->seek(offset_); |
| int readRet = stream->read(buffer_, sizeof(buffer_)); |
| read_size_ = readRet; |
| if (readRet > 0) { |
| offset_ += read_size_; |
| } |
| return readRet; |
| } |
| int64_t read_size_; |
| uint8_t buffer_[8192]; |
| uint64_t offset_; |
| uint64_t origin_offset_; |
| std::shared_ptr<core::FlowFile> flow_; |
| }; |
| // Nest Callback Class for write stream |
| class WriteCallback: public OutputStreamCallback { |
| public: |
| WriteCallback(CompressionMode compress_mode, int compress_level, CompressionFormat compress_format, |
| const std::shared_ptr<core::FlowFile> &flow, const std::shared_ptr<core::ProcessSession> &session) : |
| compress_mode_(compress_mode), compress_level_(compress_level), compress_format_(compress_format), |
| flow_(flow), session_(session), |
| logger_(logging::LoggerFactory<CompressContent>::getLogger()), |
| readDecompressCb_(flow) { |
| size_ = 0; |
| stream_ = nullptr; |
| status_ = 0; |
| } |
| ~WriteCallback() = default; |
| |
| CompressionMode compress_mode_; |
| int compress_level_; |
| CompressionFormat compress_format_; |
| std::shared_ptr<core::FlowFile> flow_; |
| std::shared_ptr<core::ProcessSession> session_; |
| std::shared_ptr<io::BaseStream> stream_; |
| int64_t size_; |
| std::shared_ptr<logging::Logger> logger_; |
| CompressContent::ReadCallbackDecompress readDecompressCb_; |
| int status_; |
| |
| static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) { |
| WriteCallback *callback = (WriteCallback *) context; |
| la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), gsl::narrow<int>(size)); |
| if (ret > 0) |
| callback->size_ += (int64_t) ret; |
| return ret; |
| } |
| |
| static la_ssize_t archive_read(struct archive *arch, void *context, const void **buff) { |
| WriteCallback *callback = (WriteCallback *) context; |
| callback->session_->read(callback->flow_, &callback->readDecompressCb_); |
| if (callback->readDecompressCb_.read_size_ >= 0) { |
| *buff = callback->readDecompressCb_.buffer_; |
| return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_); |
| } else { |
| archive_set_error(arch, EIO, "Error reading flowfile"); |
| return -1; |
| } |
| } |
| |
| static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) { |
| return 0; |
| } |
| |
| void archive_write_log_error_cleanup(struct archive *arch) { |
| logger_->log_error("Compress Content archive write error %s", archive_error_string(arch)); |
| status_ = -1; |
| archive_write_free(arch); |
| } |
| |
| void archive_read_log_error_cleanup(struct archive *arch) { |
| logger_->log_error("Compress Content archive read error %s", archive_error_string(arch)); |
| status_ = -1; |
| archive_read_free(arch); |
| } |
| |
| int64_t process(const std::shared_ptr<io::BaseStream>& stream) { |
| struct archive *arch; |
| int r; |
| |
| if (compress_mode_ == CompressionMode::Compress) { |
| arch = archive_write_new(); |
| if (!arch) { |
| status_ = -1; |
| return -1; |
| } |
| r = archive_write_set_format_ustar(arch); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| if (compress_format_ == CompressionFormat::GZIP) { |
| r = archive_write_add_filter_gzip(arch); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| std::string option; |
| option = "gzip:compression-level=" + std::to_string(compress_level_); |
| r = archive_write_set_options(arch, option.c_str()); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| } else if (compress_format_ == CompressionFormat::BZIP2) { |
| r = archive_write_add_filter_bzip2(arch); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| } else if (compress_format_ == CompressionFormat::LZMA) { |
| r = archive_write_add_filter_lzma(arch); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| } else if (compress_format_ == CompressionFormat::XZ_LZMA2) { |
| r = archive_write_add_filter_xz(arch); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| } else { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| r = archive_write_set_bytes_per_block(arch, 0); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| this->stream_ = stream; |
| r = archive_write_open(arch, this, NULL, archive_write, NULL); |
| if (r != ARCHIVE_OK) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| struct archive_entry *entry = archive_entry_new(); |
| if (!entry) { |
| archive_write_log_error_cleanup(arch); |
| return -1; |
| } |
| std::string fileName; |
| flow_->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); |
| archive_entry_set_pathname(entry, fileName.c_str()); |
| archive_entry_set_size(entry, flow_->getSize()); |
| archive_entry_set_mode(entry, S_IFREG | 0755); |
| ReadCallbackCompress readCb(flow_, arch, entry); |
| session_->read(flow_, &readCb); |
| if (readCb.status_ < 0) { |
| archive_entry_free(entry); |
| archive_write_log_error_cleanup(arch); |
| status_ = -1; |
| return -1; |
| } |
| archive_entry_free(entry); |
| archive_write_close(arch); |
| archive_write_free(arch); |
| return size_; |
| } else { |
| arch = archive_read_new(); |
| if (!arch) { |
| status_ = -1; |
| return -1; |
| } |
| r = archive_read_support_format_all(arch); |
| if (r != ARCHIVE_OK) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| r = archive_read_support_filter_all(arch); |
| if (r != ARCHIVE_OK) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| this->stream_ = stream; |
| r = archive_read_open2(arch, this, NULL, archive_read, archive_skip, NULL); |
| if (r != ARCHIVE_OK) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| struct archive_entry *entry; |
| if (archive_read_next_header(arch, &entry) != ARCHIVE_OK) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| int64_t entry_size = archive_entry_size(entry); |
| logger_->log_debug("Decompress Content archive entry size %" PRId64, entry_size); |
| size_ = 0; |
| while (size_ < entry_size) { |
| char buffer[8192]; |
| const auto read_result = archive_read_data(arch, buffer, sizeof(buffer)); |
| if (read_result < 0) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| if (read_result == 0) |
| break; |
| size_ += read_result; |
| const auto write_result = stream_->write(reinterpret_cast<uint8_t*>(buffer), gsl::narrow<int>(read_result)); |
| if (write_result < 0) { |
| archive_read_log_error_cleanup(arch); |
| return -1; |
| } |
| } |
| archive_read_close(arch); |
| archive_read_free(arch); |
| return size_; |
| } |
| } |
| }; |
| |
| class GzipWriteCallback : public OutputStreamCallback { |
| public: |
| GzipWriteCallback(CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session) |
| : logger_(logging::LoggerFactory<CompressContent>::getLogger()) |
| , compress_mode_(std::move(compress_mode)) |
| , compress_level_(compress_level) |
| , flow_(std::move(flow)) |
| , session_(std::move(session)) { |
| } |
| |
| std::shared_ptr<logging::Logger> logger_; |
| CompressionMode compress_mode_; |
| int compress_level_; |
| std::shared_ptr<core::FlowFile> flow_; |
| std::shared_ptr<core::ProcessSession> session_; |
| bool success_{false}; |
| |
| int64_t process(const std::shared_ptr<io::BaseStream>& outputStream) override { |
| class ReadCallback : public InputStreamCallback { |
| public: |
| ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream) |
| : writer_(writer) |
| , outputStream_(std::move(outputStream)) { |
| } |
| |
| int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override { |
| std::vector<uint8_t> buffer(16 * 1024U); |
| int64_t read_size = 0; |
| while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) { |
| int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size())); |
| if (ret < 0) { |
| return -1; |
| } else if (ret == 0) { |
| break; |
| } else { |
| if (outputStream_->write(buffer.data(), ret) != ret) { |
| return -1; |
| } |
| read_size += ret; |
| } |
| } |
| outputStream_->close(); |
| return read_size; |
| } |
| |
| GzipWriteCallback& writer_; |
| std::shared_ptr<io::OutputStream> outputStream_; |
| }; |
| |
| std::shared_ptr<io::ZlibBaseStream> filterStream; |
| if (compress_mode_ == CompressionMode::Compress) { |
| filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_); |
| } else { |
| filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP); |
| } |
| ReadCallback readCb(*this, filterStream); |
| session_->read(flow_, &readCb); |
| |
| success_ = filterStream->isFinished(); |
| |
| return flow_->getSize(); |
| } |
| }; |
| |
| public: |
| /** |
| * Function that's executed when the processor is scheduled. |
| * @param context process context. |
| * @param sessionFactory process session factory that is used when creating |
| * ProcessSession objects. |
| */ |
| void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); |
| // OnTrigger method, implemented by NiFi CompressContent |
| virtual void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) { |
| } |
| // OnTrigger method, implemented by NiFi CompressContent |
| virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); |
| // Initialize, over write by NiFi CompressContent |
| virtual void initialize(void); |
| |
| private: |
| static std::string toMimeType(CompressionFormat format); |
| |
| void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session); |
| |
| std::shared_ptr<logging::Logger> logger_; |
| int compressLevel_; |
| CompressionMode compressMode_; |
| ExtendedCompressionFormat compressFormat_; |
| bool updateFileName_; |
| bool encapsulateInTar_; |
| uint32_t batchSize_{1}; |
| static const std::map<std::string, CompressionFormat> compressionFormatMimeTypeMap_; |
| static const std::map<CompressionFormat, std::string> fileExtension_; |
| }; |
| |
| REGISTER_RESOURCE(CompressContent, "Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate"); |
| |
| } /* namespace processors */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |
| |
| #endif |