| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <deque> |
| #include <map> |
| #include <utility> |
| #include <vector> |
| #include <memory> |
| #include <string> |
| |
| #include "ArchiveCommon.h" |
| #include "BinFiles.h" |
| #include "archive_entry.h" |
| #include "archive.h" |
| #include "SmartArchivePtrs.h" |
| #include "core/logging/LoggerFactory.h" |
| #include "core/PropertyDefinitionBuilder.h" |
| #include "minifi-cpp/core/PropertyValidator.h" |
| #include "serialization/FlowFileSerializer.h" |
| #include "utils/ArrayUtils.h" |
| #include "utils/gsl.h" |
| #include "utils/Export.h" |
| #include "io/Stream.h" |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| namespace merge_content_options { |
| |
| inline constexpr std::string_view MERGE_STRATEGY_BIN_PACK = "Bin-Packing Algorithm"; |
| inline constexpr std::string_view MERGE_STRATEGY_DEFRAGMENT = "Defragment"; |
| inline constexpr std::string_view MERGE_FORMAT_TAR_VALUE = "TAR"; |
| inline constexpr std::string_view MERGE_FORMAT_ZIP_VALUE = "ZIP"; |
| inline constexpr std::string_view MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation"; |
| inline constexpr std::string_view MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3"; |
| inline constexpr std::string_view DELIMITER_STRATEGY_FILENAME = "Filename"; |
| inline constexpr std::string_view DELIMITER_STRATEGY_TEXT = "Text"; |
| inline constexpr std::string_view ATTRIBUTE_STRATEGY_KEEP_COMMON = "Keep Only Common Attributes"; |
| inline constexpr std::string_view ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE = "Keep All Unique Attributes"; |
| |
| } // namespace merge_content_options |
| |
| class MergeBin { |
| public: |
| virtual ~MergeBin() = default; |
| // merge the flows in the bin |
| virtual void merge(core::ProcessSession &session, |
| std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0; |
| }; |
| |
| class BinaryConcatenationMerge : public MergeBin { |
| public: |
| BinaryConcatenationMerge(std::string header, std::string footer, std::string demarcator); |
| |
| void merge(core::ProcessSession &session, |
| std::deque<std::shared_ptr<core::FlowFile>>& flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) override; |
| // Nest Callback Class for write stream |
| class WriteCallback { |
| public: |
| WriteCallback(std::string &header, std::string &footer, std::string &demarcator, |
| std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) : |
| header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) { |
| } |
| |
| std::string &header_; |
| std::string &footer_; |
| std::string &demarcator_; |
| std::deque<std::shared_ptr<core::FlowFile>> &flows_; |
| FlowFileSerializer& serializer_; |
| |
| int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const { |
| size_t write_size_sum = 0; |
| if (!header_.empty()) { |
| const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size()); |
| if (io::isError(write_ret)) |
| return -1; |
| write_size_sum += write_ret; |
| } |
| bool isFirst = true; |
| for (const auto& flow : flows_) { |
| if (!isFirst && !demarcator_.empty()) { |
| const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(demarcator_.data()), demarcator_.size()); |
| if (io::isError(write_ret)) |
| return -1; |
| write_size_sum += write_ret; |
| } |
| const auto len = serializer_.serialize(flow, stream); |
| if (len < 0) |
| return len; |
| write_size_sum += gsl::narrow<size_t>(len); |
| isFirst = false; |
| } |
| if (!footer_.empty()) { |
| const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(footer_.data()), footer_.size()); |
| if (io::isError(write_ret)) |
| return -1; |
| write_size_sum += write_ret; |
| } |
| return gsl::narrow<int64_t>(write_size_sum); |
| } |
| }; |
| |
| private: |
| std::string header_; |
| std::string footer_; |
| std::string demarcator_; |
| }; |
| |
| |
| class ArchiveMerge { |
| public: |
| class ArchiveWriter : public io::StreamImpl, public io::OutputStream { |
| public: |
| ArchiveWriter(archive& arch, archive_entry& entry) : arch_(arch), entry_(entry) {} |
| size_t write(const uint8_t* data, const size_t size) override { |
| if (!header_emitted_) { |
| if (archive_write_header(&arch_, &entry_) != ARCHIVE_OK) { |
| return io::STREAM_ERROR; |
| } |
| header_emitted_ = true; |
| } |
| size_t totalWrote = 0; |
| size_t remaining = size; |
| while (remaining > 0) { |
| const auto ret = archive_write_data(&arch_, data + totalWrote, remaining); |
| if (ret < 0) { |
| return io::STREAM_ERROR; |
| } |
| const auto zret = gsl::narrow<size_t>(ret); |
| if (zret == 0) { |
| break; |
| } |
| totalWrote += zret; |
| remaining -= zret; |
| } |
| return totalWrote; |
| } |
| |
| private: |
| archive& arch_; |
| archive_entry& entry_; |
| bool header_emitted_{false}; |
| }; |
| // Nest Callback Class for write stream |
| class WriteCallback { |
| public: |
| WriteCallback(std::string_view merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) |
| : merge_type_(merge_type), |
| flows_(flows), |
| serializer_(serializer) { |
| } |
| |
| std::string merge_type_; |
| std::deque<std::shared_ptr<core::FlowFile>> &flows_; |
| std::shared_ptr<io::OutputStream> stream_ = nullptr; |
| size_t size_ = 0; |
| std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ArchiveMerge>::getLogger(); |
| FlowFileSerializer& serializer_; |
| |
| static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) { |
| auto* callback = static_cast<WriteCallback *>(context); |
| const auto* data = static_cast<uint8_t*>(const_cast<void*>(buff)); |
| la_ssize_t totalWrote = 0; |
| size_t remaining = size; |
| while (remaining > 0) { |
| const auto ret = callback->stream_->write(data + totalWrote, remaining); |
| if (io::isError(ret)) { |
| // libarchive expects us to return -1 on error |
| return -1; |
| } |
| if (ret == 0) { |
| break; |
| } |
| callback->size_ += ret; |
| totalWrote += static_cast<la_ssize_t>(ret); |
| remaining -= ret; |
| } |
| return totalWrote; |
| } |
| |
| int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) { |
| const auto arch = archive_write_unique_ptr{archive_write_new()}; |
| |
| if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) { |
| archive_write_set_format_pax_restricted(arch.get()); // tar format |
| } |
| if (merge_type_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) { |
| archive_write_set_format_zip(arch.get()); // zip format |
| } |
| archive_write_set_bytes_per_block(arch.get(), 0); |
| archive_write_add_filter_none(arch.get()); |
| stream_ = stream; |
| archive_write_open(arch.get(), this, nullptr, archive_write, nullptr); |
| |
| for (const auto& flow : flows_) { |
| auto entry = archive_entry_unique_ptr{archive_entry_new()}; |
| std::string fileName; |
| flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName); |
| archive_entry_set_pathname(entry.get(), fileName.c_str()); |
| archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(flow->getSize())); |
| archive_entry_set_mode(entry.get(), S_IFREG | 0755); |
| if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) { |
| if (std::string perm; flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { |
| try { |
| const int perm_int = std::stoi(perm); |
| logger_->log_debug("Merge Tar File {} permission {}", fileName, perm); |
| archive_entry_set_perm(entry.get(), static_cast<mode_t>(perm_int)); |
| } catch (...) { |
| } |
| } |
| } |
| const auto ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(*arch, *entry)); |
| if (ret < 0) { |
| return ret; |
| } |
| } |
| |
| return gsl::narrow<int64_t>(size_); |
| } |
| }; |
| }; |
| |
| class TarMerge: public ArchiveMerge, public MergeBin { |
| public: |
| void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, |
| FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override; |
| }; |
| |
| class ZipMerge: public ArchiveMerge, public MergeBin { |
| public: |
| void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows, |
| FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override; |
| }; |
| |
| class AttributeMerger { |
| public: |
| explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) |
| : flows_(flows) {} |
| void mergeAttributes(core::ProcessSession &session, core::FlowFile& merge_flow); |
| virtual ~AttributeMerger() = default; |
| |
| protected: |
| std::map<std::string, std::string> getMergedAttributes(); |
| virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0; |
| |
| const std::deque<std::shared_ptr<core::FlowFile>> &flows_; |
| }; |
| |
| class KeepOnlyCommonAttributesMerger: public AttributeMerger { |
| public: |
| explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) |
| : AttributeMerger(flows) {} |
| |
| protected: |
| void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override; |
| }; |
| |
| class KeepAllUniqueAttributesMerger: public AttributeMerger { |
| public: |
| explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows) |
| : AttributeMerger(flows) {} |
| |
| protected: |
| void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override; |
| |
| private: |
| std::vector<std::string> removed_attributes_; |
| }; |
| |
| /** |
| * A processor that merges multiple correlated flow files to a single flow file |
| * |
| * Concepts: |
| * - Batch size: represents the maximum number of flow files to be processed from the incoming relationship |
| * - Bin (or bundle): represents a set of flow files that belong together defined by the processor properties. Correlated flow files are defined by the CorrelationAttributeName property which |
| * defines the attribute that provides the groupid for the bin the flow file belongs to |
| * - Ready bin: when a bin reaches a limit defined by the maximum age or the maximum size, the bin becomes ready, and ready bins can be merged |
| * - Group: a set of bins with the same groupid. In case a bin cannot accept a new flow files (e.g. it would go above its size limit), a new bin is created with this new flow file and added |
| * to the same group of bins |
| */ |
| class MergeContent : public processors::BinFiles { |
| public: |
| explicit MergeContent(core::ProcessorMetadata metadata) |
| : processors::BinFiles(metadata) { |
| mergeStrategy_ = merge_content_options::MERGE_STRATEGY_DEFRAGMENT; |
| mergeFormat_ = merge_content_options::MERGE_FORMAT_CONCAT_VALUE; |
| delimiterStrategy_ = merge_content_options::DELIMITER_STRATEGY_FILENAME; |
| keepPath_ = false; |
| attributeStrategy_ = merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON; |
| } |
| ~MergeContent() override = default; |
| |
| EXTENSIONAPI static constexpr const char* Description = "Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. " |
| "MergeContent should be configured with only one incoming connection as it won't create grouped Flow Files." |
| "This processor updates the mime.type attribute as appropriate."; |
| |
| EXTENSIONAPI static constexpr auto MergeStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Merge Strategy") |
| .withDescription("Defragment or Bin-Packing Algorithm") |
| .withAllowedValues({merge_content_options::MERGE_STRATEGY_DEFRAGMENT, merge_content_options::MERGE_STRATEGY_BIN_PACK}) |
| .withDefaultValue(merge_content_options::MERGE_STRATEGY_BIN_PACK) |
| .build(); |
| EXTENSIONAPI static constexpr auto MergeFormat = core::PropertyDefinitionBuilder<4>::createProperty("Merge Format") |
| .withDescription("Merge Format") |
| .withAllowedValues({ |
| merge_content_options::MERGE_FORMAT_CONCAT_VALUE, |
| merge_content_options::MERGE_FORMAT_TAR_VALUE, |
| merge_content_options::MERGE_FORMAT_ZIP_VALUE, |
| merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE}) |
| .withDefaultValue(merge_content_options::MERGE_FORMAT_CONCAT_VALUE) |
| .build(); |
| EXTENSIONAPI static constexpr auto CorrelationAttributeName = core::PropertyDefinitionBuilder<>::createProperty("Correlation Attribute Name") |
| .withDescription("Correlation Attribute Name") |
| .build(); |
| EXTENSIONAPI static constexpr auto DelimiterStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Delimiter Strategy") |
| .withDescription("Determines if Header, Footer, and Demarcator should point to files") |
| .withAllowedValues({merge_content_options::DELIMITER_STRATEGY_FILENAME, merge_content_options::DELIMITER_STRATEGY_TEXT}) |
| .withDefaultValue(merge_content_options::DELIMITER_STRATEGY_FILENAME) |
| .build(); |
| EXTENSIONAPI static constexpr auto Header = core::PropertyDefinitionBuilder<>::createProperty("Header File") |
| .withDescription("Filename specifying the header to use") |
| .build(); |
| EXTENSIONAPI static constexpr auto Footer = core::PropertyDefinitionBuilder<>::createProperty("Footer File") |
| .withDescription("Filename specifying the footer to use") |
| .build(); |
| EXTENSIONAPI static constexpr auto Demarcator = core::PropertyDefinitionBuilder<>::createProperty("Demarcator File") |
| .withDescription("Filename specifying the demarcator to use") |
| .build(); |
| EXTENSIONAPI static constexpr auto KeepPath = core::PropertyDefinitionBuilder<>::createProperty("Keep Path") |
| .withDescription("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry") |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("false") |
| .build(); |
| EXTENSIONAPI static constexpr auto AttributeStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Attribute Strategy") |
| .withDescription("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, " |
| "any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile " |
| "(in which case neither, or none, of the conflicting attributes will be kept). If 'Keep Only Common Attributes' is selected, " |
| "only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.") |
| .withAllowedValues({merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON, merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE}) |
| .withDefaultValue(merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) |
| .build(); |
| EXTENSIONAPI static constexpr auto Properties = utils::array_cat(BinFiles::Properties, std::to_array<core::PropertyReference>({ |
| MergeStrategy, |
| MergeFormat, |
| CorrelationAttributeName, |
| DelimiterStrategy, |
| KeepPath, |
| Header, |
| Footer, |
| Demarcator, |
| AttributeStrategy |
| })); |
| |
| |
| EXTENSIONAPI static constexpr auto Merge = core::RelationshipDefinition{"merged", "The FlowFile containing the merged content"}; |
| EXTENSIONAPI static constexpr auto Relationships = utils::array_cat(BinFiles::Relationships, std::array{Merge}); |
| |
| EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; |
| EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; |
| EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; |
| EXTENSIONAPI static constexpr bool IsSingleThreaded = true; |
| |
| ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS |
| |
| void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; |
| void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; |
| void initialize() override; |
| bool processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) override; |
| |
| protected: |
| // Returns a group ID representing a bin. This allows flow files to be binned into like groups |
| std::string getGroupId(const std::shared_ptr<core::FlowFile>& flow) override; |
| // check whether the defragment bin is validate |
| static bool checkDefragment(std::unique_ptr<Bin> &bin); |
| |
| private: |
| void validatePropertyOptions(); |
| |
| std::string mergeStrategy_; |
| std::string mergeFormat_; |
| std::string correlationAttributeName_; |
| bool keepPath_; |
| std::string delimiterStrategy_; |
| std::string header_; |
| std::string footer_; |
| std::string demarcator_; |
| std::string headerContent_; |
| std::string footerContent_; |
| std::string demarcatorContent_; |
| std::string attributeStrategy_; |
| static std::string readContent(const std::string& path); |
| }; |
| |
| } // namespace org::apache::nifi::minifi::processors |