blob: b847f045171535bb1d2f98b5b090d879313c8092 [file] [log] [blame]
* @file MergeContent.h
* MergeContent class declaration
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#ifndef __MERGE_CONTENT_H__
#define __MERGE_CONTENT_H__
#include "ArchiveCommon.h"
#include "BinFiles.h"
#include "archive_entry.h"
#include "archive.h"
#include "core/logging/LoggerConfiguration.h"
#include "serialization/FlowFileSerializer.h"
#include "utils/gsl.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
namespace merge_content_options {
constexpr const char *MERGE_STRATEGY_BIN_PACK = "Bin-Packing Algorithm";
constexpr const char *MERGE_STRATEGY_DEFRAGMENT = "Defragment";
constexpr const char *MERGE_FORMAT_TAR_VALUE = "TAR";
constexpr const char *MERGE_FORMAT_ZIP_VALUE = "ZIP";
constexpr const char *MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation";
constexpr const char* MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
constexpr const char *DELIMITER_STRATEGY_FILENAME = "Filename";
constexpr const char *DELIMITER_STRATEGY_TEXT = "Text";
constexpr const char *ATTRIBUTE_STRATEGY_KEEP_COMMON = "Keep Only Common Attributes";
constexpr const char *ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE = "Keep All Unique Attributes";
} /* namespace merge_content_options */
// MergeBin Class
class MergeBin {
virtual ~MergeBin() = default;
// merge the flows in the bin
virtual void merge(core::ProcessContext *context, core::ProcessSession *session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
// BinaryConcatenationMerge Class
class BinaryConcatenationMerge : public MergeBin {
BinaryConcatenationMerge(const std::string& header, const std::string& footer, const std::string& demarcator);
void merge(core::ProcessContext *context, core::ProcessSession *session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile);
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) {
std::string &header_;
std::string &footer_;
std::string &demarcator_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
FlowFileSerializer& serializer_;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
int64_t ret = 0;
if (!header_.empty()) {
int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(, gsl::narrow<int>(header_.size()));
if (len < 0)
return len;
ret += len;
bool isFirst = true;
for (auto flow : flows_) {
if (!isFirst && !demarcator_.empty()) {
int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(, gsl::narrow<int>(demarcator_.size()));
if (len < 0)
return len;
ret += len;
int len = serializer_.serialize(flow, stream);
if (len < 0)
return len;
ret += len;
isFirst = false;
if (!footer_.empty()) {
int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(, gsl::narrow<int>(footer_.size()));
if (len < 0)
return len;
ret += len;
return ret;
std::string header_;
std::string footer_;
std::string demarcator_;
// Archive Class
class ArchiveMerge {
class ArchiveWriter : public io::OutputStream {
ArchiveWriter(struct archive *arch, struct archive_entry *entry) : arch_(arch), entry_(entry) {}
int write(const uint8_t* data, int size) override {
if (!header_emitted_) {
if (archive_write_header(arch_, entry_) != ARCHIVE_OK) {
return -1;
header_emitted_ = true;
int totalWrote = 0;
int remaining = size;
while (remaining > 0) {
const auto ret = archive_write_data(arch_, data + totalWrote, remaining);
if (ret < 0) {
return ret;
if (ret == 0) {
totalWrote += ret;
remaining -= ret;
return totalWrote;
struct archive *arch_;
struct archive_entry *entry_;
bool header_emitted_{false};
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
: merge_type_(merge_type),
serializer_(serializer) {
size_ = 0;
stream_ = nullptr;
~WriteCallback() = default;
std::string merge_type_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
std::shared_ptr<io::BaseStream> stream_;
size_t size_;
std::shared_ptr<logging::Logger> logger_;
FlowFileSerializer& serializer_;
static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) {
WriteCallback *callback = (WriteCallback *) context;
uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff));
la_ssize_t totalWrote = 0;
int remaining = gsl::narrow<int>(size);
while (remaining > 0) {
la_ssize_t ret = callback->stream_->write(data + totalWrote, remaining);
if (ret < 0) {
// libarchive expects us to return -1 on error
return -1;
if (ret == 0) {
callback->size_ += ret;
totalWrote += ret;
remaining -= ret;
return totalWrote;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
struct archive *arch;
arch = archive_write_new();
if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
archive_write_set_format_pax_restricted(arch); // tar format
if (merge_type_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
archive_write_set_format_zip(arch); // zip format
archive_write_set_bytes_per_block(arch, 0);
stream_ = stream;
archive_write_open(arch, this, NULL, archive_write, NULL);
for (auto flow : flows_) {
struct archive_entry *entry = archive_entry_new();
std::string fileName;
flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
archive_entry_set_pathname(entry, fileName.c_str());
archive_entry_set_size(entry, flow->getSize());
archive_entry_set_mode(entry, S_IFREG | 0755);
if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
std::string perm;
int permInt;
if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) {
try {
permInt = std::stoi(perm);
logger_->log_debug("Merge Tar File %s permission %s", fileName, perm);
archive_entry_set_perm(entry, (mode_t) permInt);
} catch (...) {
int ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry));
if (ret < 0) {
return ret;
return size_;
// TarMerge Class
class TarMerge: public ArchiveMerge, public MergeBin {
void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
// ZipMerge Class
class ZipMerge: public ArchiveMerge, public MergeBin {
void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
class AttributeMerger {
explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: flows_(flows) {}
void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow);
virtual ~AttributeMerger() = default;
std::map<std::string, std::string> getMergedAttributes();
virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0;
const std::deque<std::shared_ptr<core::FlowFile>> &flows_;
class KeepOnlyCommonAttributesMerger: public AttributeMerger {
explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
class KeepAllUniqueAttributesMerger: public AttributeMerger {
explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
std::vector<std::string> removed_attributes_;
// MergeContent Class
class MergeContent : public processors::BinFiles {
// Constructor
* Create a new processor
explicit MergeContent(std::string name, utils::Identifier uuid = utils::Identifier())
: processors::BinFiles(name, uuid),
logger_(logging::LoggerFactory<MergeContent>::getLogger()) {
mergeStrategy_ = merge_content_options::MERGE_STRATEGY_DEFRAGMENT;
mergeFormat_ = merge_content_options::MERGE_FORMAT_CONCAT_VALUE;
delimiterStrategy_ = merge_content_options::DELIMITER_STRATEGY_FILENAME;
keepPath_ = false;
attributeStrategy_ = merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON;
// Destructor
virtual ~MergeContent() = default;
// Processor Name
static constexpr char const* ProcessorName = "MergeContent";
// Supported Properties
static core::Property MergeStrategy;
static core::Property MergeFormat;
static core::Property CorrelationAttributeName;
static core::Property DelimiterStrategy;
static core::Property KeepPath;
static core::Property Header;
static core::Property Footer;
static core::Property Demarcator;
static core::Property AttributeStrategy;
// Supported Relationships
static core::Relationship Merge;
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
// OnTrigger method, implemented by NiFi MergeContent
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi MergeContent
virtual void initialize(void);
virtual bool processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
// Returns a group ID representing a bin. This allows flow files to be binned into like groups
virtual std::string getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow);
// check whether the defragment bin is validate
bool checkDefragment(std::unique_ptr<Bin> &bin);
void validatePropertyOptions();
std::shared_ptr<logging::Logger> logger_;
std::string mergeStrategy_;
std::string mergeFormat_;
std::string correlationAttributeName_;
bool keepPath_;
std::string delimiterStrategy_;
std::string header_;
std::string footer_;
std::string demarcator_;
std::string headerContent_;
std::string footerContent_;
std::string demarcatorContent_;
std::string attributeStrategy_;
// readContent
std::string readContent(std::string path);
REGISTER_RESOURCE(MergeContent, "Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. "
"MergeContent should be configured with only one incoming connection as it won't create grouped Flow Files."
"This processor updates the mime.type attribute as appropriate.");
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */