blob: 8bd3f79bc786a3c551c43e4106b16e8b38d6bfe6 [file] [log] [blame]
/**
* @file MergeContent.cpp
* MergeContent class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MergeContent.h"
#include <stdio.h>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <queue>
#include <map>
#include <deque>
#include <utility>
#include <algorithm>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property MergeContent::MergeStrategy("Merge Strategy", "Defragment or Bin-Packing Algorithm", MERGE_STRATEGY_DEFRAGMENT);
core::Property MergeContent::MergeFormat("Merge Format", "Merge Format", MERGE_FORMAT_CONCAT_VALUE);
core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
core::Property MergeContent::DelimiterStratgey("Delimiter Strategy", "Determines if Header, Footer, and Demarcator should point to files", DELIMITER_STRATEGY_FILENAME);
core::Property MergeContent::Header("Header File", "Filename specifying the header to use", "");
core::Property MergeContent::Footer("Footer File", "Filename specifying the footer to use", "");
core::Property MergeContent::Demarcator("Demarcator File", "Filename specifying the demarcator to use", "");
core::Property MergeContent::KeepPath("Keep Path", "If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry", "false");
core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
const char *TarMerge::mimeType = "application/tar";
const char *ZipMerge::mimeType = "application/zip";
void MergeContent::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(MinSize);
properties.insert(MaxSize);
properties.insert(MinEntries);
properties.insert(MaxEntries);
properties.insert(MaxBinAge);
properties.insert(MaxBinCount);
properties.insert(MergeStrategy);
properties.insert(MergeFormat);
properties.insert(CorrelationAttributeName);
properties.insert(DelimiterStratgey);
properties.insert(Header);
properties.insert(Footer);
properties.insert(Demarcator);
properties.insert(KeepPath);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Original);
relationships.insert(Failure);
relationships.insert(Merge);
setSupportedRelationships(relationships);
}
std::string MergeContent::readContent(std::string path) {
std::string contents;
std::ifstream in(path.c_str(), std::ios::in | std::ios::binary);
if (in) {
in.seekg(0, std::ios::end);
contents.resize(in.tellg());
in.seekg(0, std::ios::beg);
in.read(&contents[0], contents.size());
in.close();
}
return (contents);
}
void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
BinFiles::onSchedule(context, sessionFactory);
if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
this->mergeStratgey_ = value;
}
value = "";
if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
this->mergeFormat_ = value;
}
value = "";
if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
this->correlationAttributeName_ = value;
}
value = "";
if (context->getProperty(DelimiterStratgey.getName(), value) && !value.empty()) {
this->delimiterStratgey_ = value;
}
value = "";
if (context->getProperty(Header.getName(), value) && !value.empty()) {
this->header_ = value;
}
value = "";
if (context->getProperty(Footer.getName(), value) && !value.empty()) {
this->footer_ = value;
}
value = "";
if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
this->demarcator_ = value;
}
value = "";
if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
}
if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
binManager_.setFileCount(FRAGMENT_COUNT_ATTRIBUTE);
}
logger_->log_debug("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStratgey_, mergeFormat_, correlationAttributeName_, delimiterStratgey_);
logger_->log_debug("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
if (delimiterStratgey_ == DELIMITER_STRATEGY_FILENAME) {
if (!header_.empty()) {
this->headerContent_ = readContent(header_);
}
if (!footer_.empty()) {
this->footerContent_ = readContent(footer_);
}
if (!demarcator_.empty()) {
this->demarcatorContent_ = readContent(demarcator_);
}
}
if (delimiterStratgey_ == DELIMITER_STRATEGY_TEXT) {
this->headerContent_ = header_;
this->footerContent_ = footer_;
this->demarcatorContent_ = demarcator_;
}
}
std::string MergeContent::getGroupId(core::ProcessContext *context, std::shared_ptr<core::FlowFile> flow) {
std::string groupId = "";
std::string value;
if (!correlationAttributeName_.empty()) {
if (flow->getAttribute(correlationAttributeName_, value))
groupId = value;
}
if (groupId.empty() && mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
if (flow->getAttribute(FRAGMENT_ID_ATTRIBUTE, value))
groupId = value;
}
return groupId;
}
bool MergeContent::checkDefragment(std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
if (!flows.empty()) {
std::shared_ptr<core::FlowFile> front = flows.front();
std::string fragId;
if (!front->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, fragId))
return false;
std::string fragCount;
if (!front->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, fragCount))
return false;
int fragCountInt;
try {
fragCountInt = std::stoi(fragCount);
}
catch (...) {
return false;
}
for (auto flow : flows) {
std::string value;
if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value))
return false;
if (value != fragId)
return false;
if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value))
return false;
if (value != fragCount)
return false;
if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value))
return false;
try {
int index = std::stoi(value);
if (index < 0 || index >= fragCountInt)
return false;
}
catch (...) {
return false;
}
}
} else {
return false;
}
return true;
}
void MergeContent::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
BinFiles::onTrigger(context, session);
}
bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
if (mergeStratgey_ != MERGE_STRATEGY_DEFRAGMENT && mergeStratgey_ != MERGE_STRATEGY_BIN_PACK)
return false;
if (mergeStratgey_ == MERGE_STRATEGY_DEFRAGMENT) {
// check the flowfile fragment values
if (!checkDefragment(bin)) {
logger_->log_error("Merge Content check defgrament failed");
return false;
}
// sort the flowfile fragment index
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
{std::string value;
first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexFirst = std::stoi(value);
second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexSecond = std::stoi(value);
if (indexSecond > indexFirst)
return true;
else
return false;
});
}
std::unique_ptr<MergeBin> mergeBin;
if (mergeFormat_ == MERGE_FORMAT_CONCAT_VALUE || mergeFormat_ == MERGE_FORMAT_TAR_VALUE
|| mergeFormat_ == MERGE_FORMAT_ZIP_VALUE) {
if (mergeFormat_ == MERGE_FORMAT_CONCAT_VALUE)
mergeBin = std::unique_ptr < MergeBin > (new BinaryConcatenationMerge());
else if (mergeFormat_ == MERGE_FORMAT_TAR_VALUE)
mergeBin = std::unique_ptr < MergeBin > (new TarMerge());
else if (mergeFormat_ == MERGE_FORMAT_ZIP_VALUE)
mergeBin = std::unique_ptr < MergeBin > (new ZipMerge());
else
return false;
std::shared_ptr<core::FlowFile> mergeFlow;
try {
mergeFlow = mergeBin->merge(context, session, bin->getFlowFile(), this->headerContent_, this->footerContent_, this->demarcatorContent_);
} catch (...) {
logger_->log_error("Merge Content merge catch exception");
return false;
}
session->putAttribute(mergeFlow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
// we successfully merge the flow
session->transfer(mergeFlow, Merge);
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (auto flow : flows) {
session->transfer(flow, Original);
}
logger_->log_info("Merge FlowFile record UUID %s, payload length %d", mergeFlow->getUUIDStr(), mergeFlow->getSize());
} else {
logger_->log_error("Merge format not supported %s", mergeFormat_);
return false;
}
return true;
}
std::shared_ptr<core::FlowFile> BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType());
std::string fileName;
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty())
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
return flowFile;
}
std::shared_ptr<core::FlowFile> TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType());
std::string fileName;
flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty()) {
fileName += ".tar";
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
}
return flowFile;
}
std::shared_ptr<core::FlowFile> ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header,
std::string &footer, std::string &demarcator) {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast < FlowFileRecord > (session->create());
ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
session->write(flowFile, &callback);
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType());
std::string fileName;
flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
if (flows.size() == 1) {
flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
} else {
flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName);
}
if (!fileName.empty()) {
fileName += ".zip";
session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
}
return flowFile;
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */