blob: dbbe7e063079a8a9826f93396b41b9560271ccb1 [file] [log] [blame]
/**
* @file MergeContent.cpp
* MergeContent class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MergeContent.h"
#include <stdio.h>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <queue>
#include <map>
#include <deque>
#include <utility>
#include <algorithm>
#include <numeric>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/GeneralUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property MergeContent::MergeStrategy(
core::PropertyBuilder::createProperty("Merge Strategy")
->withDescription("Defragment or Bin-Packing Algorithm")
->withAllowableValues<std::string>({merge_content_options::MERGE_STRATEGY_DEFRAGMENT, merge_content_options::MERGE_STRATEGY_BIN_PACK})
->withDefaultValue(merge_content_options::MERGE_STRATEGY_DEFRAGMENT)->build());
core::Property MergeContent::MergeFormat(
core::PropertyBuilder::createProperty("Merge Format")
->withDescription("Merge Format")
->withAllowableValues<std::string>({merge_content_options::MERGE_FORMAT_CONCAT_VALUE, merge_content_options::MERGE_FORMAT_TAR_VALUE, merge_content_options::MERGE_FORMAT_ZIP_VALUE})
->withDefaultValue(merge_content_options::MERGE_FORMAT_CONCAT_VALUE)->build());
core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
core::Property MergeContent::DelimiterStrategy(
core::PropertyBuilder::createProperty("Delimiter Strategy")
->withDescription("Determines if Header, Footer, and Demarcator should point to files")
->withAllowableValues<std::string>({merge_content_options::DELIMITER_STRATEGY_FILENAME, merge_content_options::DELIMITER_STRATEGY_TEXT})
->withDefaultValue(merge_content_options::DELIMITER_STRATEGY_FILENAME)->build());
core::Property MergeContent::Header("Header File", "Filename specifying the header to use", "");
core::Property MergeContent::Footer("Footer File", "Filename specifying the footer to use", "");
core::Property MergeContent::Demarcator("Demarcator File", "Filename specifying the demarcator to use", "");
core::Property MergeContent::KeepPath(
core::PropertyBuilder::createProperty("Keep Path")
->withDescription("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry")
->withDefaultValue(false)->build());
core::Property MergeContent::AttributeStrategy(
core::PropertyBuilder::createProperty("Attribute Strategy")
->withDescription("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.")
->withAllowableValues<std::string>({merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON, merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE})
->withDefaultValue(merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)->build());
core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
const char *TarMerge::mimeType = "application/tar";
const char *ZipMerge::mimeType = "application/zip";
void MergeContent::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(MinSize);
properties.insert(MaxSize);
properties.insert(MinEntries);
properties.insert(MaxEntries);
properties.insert(MaxBinAge);
properties.insert(MaxBinCount);
properties.insert(MergeStrategy);
properties.insert(MergeFormat);
properties.insert(CorrelationAttributeName);
properties.insert(DelimiterStrategy);
properties.insert(Header);
properties.insert(Footer);
properties.insert(Demarcator);
properties.insert(KeepPath);
properties.insert(AttributeStrategy);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Original);
relationships.insert(Failure);
relationships.insert(Merge);
setSupportedRelationships(relationships);
}
std::string MergeContent::readContent(std::string path) {
std::string contents;
std::ifstream in(path.c_str(), std::ios::in | std::ios::binary);
if (in) {
in.seekg(0, std::ios::end);
contents.resize(gsl::narrow<size_t>(in.tellg()));
in.seekg(0, std::ios::beg);
in.read(&contents[0], contents.size());
in.close();
}
return (contents);
}
void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
BinFiles::onSchedule(context, sessionFactory);
if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
mergeStrategy_ = value;
}
if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
mergeFormat_ = value;
}
if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
correlationAttributeName_ = value;
}
if (context->getProperty(DelimiterStrategy.getName(), value) && !value.empty()) {
delimiterStrategy_ = value;
}
if (context->getProperty(Header.getName(), value) && !value.empty()) {
header_ = value;
}
if (context->getProperty(Footer.getName(), value) && !value.empty()) {
footer_ = value;
}
if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
demarcator_ = value;
}
if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
}
if (context->getProperty(AttributeStrategy.getName(), value) && !value.empty()) {
attributeStrategy_ = value;
}
if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
}
logger_->log_debug("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStrategy_, mergeFormat_, correlationAttributeName_, delimiterStrategy_);
logger_->log_debug("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
if (delimiterStrategy_ == merge_content_options::DELIMITER_STRATEGY_FILENAME) {
if (!header_.empty()) {
headerContent_ = readContent(header_);
}
if (!footer_.empty()) {
footerContent_ = readContent(footer_);
}
if (!demarcator_.empty()) {
demarcatorContent_ = readContent(demarcator_);
}
}
if (delimiterStrategy_ == merge_content_options::DELIMITER_STRATEGY_TEXT) {
headerContent_ = header_;
footerContent_ = footer_;
demarcatorContent_ = demarcator_;
}
}
std::string MergeContent::getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
std::string groupId = "";
std::string value;
if (!correlationAttributeName_.empty()) {
if (flow->getAttribute(correlationAttributeName_, value))
groupId = value;
}
if (groupId.empty() && mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
if (flow->getAttribute(FRAGMENT_ID_ATTRIBUTE, value))
groupId = value;
}
return groupId;
}
bool MergeContent::checkDefragment(std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
if (!flows.empty()) {
std::shared_ptr<core::FlowFile> front = flows.front();
std::string fragId;
if (!front->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, fragId))
return false;
std::string fragCount;
if (!front->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, fragCount))
return false;
int fragCountInt;
try {
fragCountInt = std::stoi(fragCount);
}
catch (...) {
return false;
}
for (auto flow : flows) {
std::string value;
if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value))
return false;
if (value != fragId)
return false;
if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value))
return false;
if (value != fragCount)
return false;
if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value))
return false;
try {
int index = std::stoi(value);
if (index < 0 || index >= fragCountInt)
return false;
}
catch (...) {
return false;
}
}
} else {
return false;
}
return true;
}
void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
BinFiles::onTrigger(context, session);
}
bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK)
return false;
if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
// check the flowfile fragment values
if (!checkDefragment(bin)) {
logger_->log_error("Merge Content check defgrament failed");
return false;
}
// sort the flowfile fragment index
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
{std::string value;
first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexFirst = std::stoi(value);
second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexSecond = std::stoi(value);
if (indexSecond > indexFirst)
return true;
else
return false;
});
}
std::unique_ptr<MergeBin> mergeBin;
if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE)
mergeBin = utils::make_unique<BinaryConcatenationMerge>();
else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE)
mergeBin = utils::make_unique<TarMerge>();
else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE)
mergeBin = utils::make_unique<ZipMerge>();
else {
logger_->log_error("Merge format not supported %s", mergeFormat_);
return false;
}
std::shared_ptr<core::FlowFile> mergeFlow;
try {
mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_, demarcatorContent_);
} catch (...) {
logger_->log_error("Merge Content merge catch exception");
return false;
}
session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow);
else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE)
KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, mergeFlow);
else {
logger_->log_error("Attribute strategy not supported %s", attributeStrategy_);
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid attribute strategy: " + attributeStrategy_);
}
// we successfully merge the flow
session->transfer(mergeFlow, Merge);
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (auto flow : flows) {
session->transfer(flow, Original);
}
logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(), mergeFlow->getSize());
return true;
}
std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType());
std::string fileName;
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty())
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
return flowFile;
}
std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType());
std::string fileName;
flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty()) {
fileName += ".tar";
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
}
return flowFile;
}
std::shared_ptr<core::FlowFile> ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), getMergedContentType());
std::string fileName;
flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty()) {
fileName += ".zip";
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
}
return flowFile;
}
void AttributeMerger::mergeAttributes(core::ProcessSession *session, std::shared_ptr<core::FlowFile> &merge_flow) {
for (const auto& pair : getMergedAttributes()) {
session->putAttribute(merge_flow, pair.first, pair.second);
}
}
std::map<std::string, std::string> AttributeMerger::getMergedAttributes() {
if (flows_.empty()) return {};
std::map<std::string, std::string> sum{ flows_.front()->getAttributes() };
const auto merge_attributes = [this](std::map<std::string, std::string>* const merged_attributes, const std::shared_ptr<core::FlowFile>& flow) {
processFlowFile(flow, *merged_attributes);
return merged_attributes;
};
return *std::accumulate(std::next(flows_.cbegin()), flows_.cend(), &sum, merge_attributes);
}
void KeepOnlyCommonAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) {
auto flow_attributes = flow_file->getAttributes();
std::map<std::string, std::string> tmp_merged;
std::set_intersection(std::make_move_iterator(merged_attributes.begin()), std::make_move_iterator(merged_attributes.end()),
std::make_move_iterator(flow_attributes.begin()), std::make_move_iterator(flow_attributes.end()), std::inserter(tmp_merged, tmp_merged.begin()));
merged_attributes = std::move(tmp_merged);
}
void KeepAllUniqueAttributesMerger::processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string>& merged_attributes) {
auto flow_attributes = flow_file->getAttributes();
for (auto&& attr : flow_attributes) {
if(std::find(removed_attributes_.cbegin(), removed_attributes_.cend(), attr.first) != removed_attributes_.cend()) {
continue;
}
std::map<std::string, std::string>::iterator insertion_res;
bool insertion_happened;
std::tie(insertion_res, insertion_happened) = merged_attributes.insert(attr);
if(!insertion_happened && insertion_res->second != attr.second) {
merged_attributes.erase(insertion_res);
removed_attributes_.push_back(attr.first);
}
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */