blob: 035ed2905a2097aaa8d66468c767800d4445d00d [file] [log] [blame]
/**
* @file BinFiles.cpp
* BinFiles class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "BinFiles.h"
#include <stdio.h>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <queue>
#include <map>
#include <deque>
#include <utility>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property BinFiles::MinSize("Minimum Group Size", "The minimum size of for the bundle", "0");
core::Property BinFiles::MaxSize("Maximum Group Size", "The maximum size for the bundle. If not specified, there is no maximum.", "");
core::Property BinFiles::MinEntries("Minimum Number of Entries", "The minimum number of files to include in a bundle", "1");
core::Property BinFiles::MaxEntries("Maximum Number of Entries", "The maximum number of files to include in a bundle. If not specified, there is no maximum.", "");
core::Property BinFiles::MaxBinAge("Max Bin Age", "The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>", "");
core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure");
const char *BinFiles::FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
const char *BinFiles::FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
const char *BinFiles::FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
const char *BinFiles::SEGMENT_COUNT_ATTRIBUTE = "segment.count";
const char *BinFiles::SEGMENT_ID_ATTRIBUTE = "segment.identifier";
const char *BinFiles::SEGMENT_INDEX_ATTRIBUTE = "segment.index";
const char *BinFiles::SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
const char *BinFiles::TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
void BinFiles::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(MinSize);
properties.insert(MaxSize);
properties.insert(MinEntries);
properties.insert(MaxEntries);
properties.insert(MaxBinAge);
properties.insert(MaxBinCount);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Original);
relationships.insert(Failure);
setSupportedRelationships(relationships);
}
void BinFiles::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
int64_t valInt;
if (context->getProperty(MinSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMinSize(valInt);
logger_->log_debug("BinFiles: MinSize [%d]", valInt);
}
value = "";
if (context->getProperty(MaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMaxSize(valInt);
logger_->log_debug("BinFiles: MaxSize [%d]", valInt);
}
value = "";
if (context->getProperty(MinEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMinEntries(valInt);
logger_->log_debug("BinFiles: MinEntries [%d]", valInt);
}
value = "";
if (context->getProperty(MaxEntries.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
this->binManager_.setMaxEntries(valInt);
logger_->log_debug("BinFiles: MaxEntries [%d]", valInt);
}
value = "";
if (context->getProperty(MaxBinCount.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) {
maxBinCount_ = static_cast<int> (valInt);
logger_->log_debug("BinFiles: MaxBinCount [%d]", valInt);
}
value = "";
if (context->getProperty(MaxBinAge.getName(), value) && !value.empty()) {
core::TimeUnit unit;
if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) {
this->binManager_.setBinAge(valInt);
logger_->log_debug("BinFiles: MaxBinAge [%d]", valInt);
}
}
}
void BinFiles::preprocessFlowFile(core::ProcessContext *context, core::ProcessSession *session, std::shared_ptr<core::FlowFile> flow) {
// handle backward compatibility with old segment attributes
std::string value;
if (!flow->getAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_COUNT_ATTRIBUTE, value)) {
flow->setAttribute(BinFiles::FRAGMENT_COUNT_ATTRIBUTE, value);
}
if (!flow->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_INDEX_ATTRIBUTE, value)) {
flow->setAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
}
if (!flow->getAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value) && flow->getAttribute(BinFiles::SEGMENT_ID_ATTRIBUTE, value)) {
flow->setAttribute(BinFiles::FRAGMENT_ID_ATTRIBUTE, value);
}
}
void BinManager::gatherReadyBins() {
std::lock_guard < std::mutex > lock(mutex_);
std::vector< std::string > emptyQueue;
for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
while (!queue->empty()) {
std::unique_ptr<Bin> &bin = queue->front();
if (bin->isReadyForMerge() || (binAge_ != ULLONG_MAX && bin->isOlderThan(binAge_))) {
readyBin_.push_back(std::move(bin));
queue->pop_front();
binCount_--;
logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
} else {
break;
}
}
if (queue->empty()) {
emptyQueue.push_back(it->first);
}
}
for (auto group : emptyQueue) {
// erase from the map if the queue is empty for the group
groupBinMap_.erase(group);
}
logger_->log_debug("BinManager groupBinMap size %d", groupBinMap_.size());
}
void BinManager::removeOldestBin() {
std::lock_guard < std::mutex > lock(mutex_);
uint64_t olddate = ULLONG_MAX;
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
if (!queue->empty()) {
std::unique_ptr<Bin> &bin = queue->front();
if (bin->getBinAge() < olddate) {
olddate = bin->getBinAge();
oldqueue = &queue;
}
}
}
if (olddate != ULLONG_MAX) {
std::unique_ptr<Bin> &remove = (*oldqueue)->front();
std::string group = remove->getGroupId();
readyBin_.push_back(std::move(remove));
(*oldqueue)->pop_front();
binCount_--;
logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), readyBin_.back()->getGroupId());
if ((*oldqueue)->empty()) {
groupBinMap_.erase(group);
}
}
logger_->log_debug("BinManager groupBinMap size %d", groupBinMap_.size());
}
void BinManager::getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins) {
std::lock_guard < std::mutex > lock(mutex_);
while (!readyBin_.empty()) {
std::unique_ptr<Bin> &bin = readyBin_.front();
retBins.push_back(std::move(bin));
readyBin_.pop_front();
}
}
bool BinManager::offer(const std::string &group, std::shared_ptr<core::FlowFile> flow) {
std::lock_guard < std::mutex > lock(mutex_);
if (flow->getSize() > maxSize_) {
// could not be added to a bin -- too large by itself, so create a separate bin for just this guy.
std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(0, ULLONG_MAX, 1, INT_MAX, "", group));
if (!bin->offer(flow))
return false;
readyBin_.push_back(std::move(bin));
logger_->log_debug("BinManager move bin %s to ready bins for group %s", readyBin_.back()->getUUIDStr(), group);
return true;
}
auto search = groupBinMap_.find(group);
if (search != groupBinMap_.end()) {
std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = search->second;
if (!queue->empty()) {
std::unique_ptr<Bin> &tail = queue->back();
if (!tail->offer(flow)) {
// last bin can not offer the flow
std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
if (!bin->offer(flow))
return false;
queue->push_back(std::move(bin));
logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
binCount_++;
}
} else {
std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
if (!bin->offer(flow))
return false;
queue->push_back(std::move(bin));
binCount_++;
logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
}
} else {
std::unique_ptr<std::deque<std::unique_ptr<Bin>>> queue = std::unique_ptr<std::deque<std::unique_ptr<Bin>>> (new std::deque<std::unique_ptr<Bin>>());
std::unique_ptr<Bin> bin = std::unique_ptr < Bin > (new Bin(minSize_, maxSize_, minEntries_, maxEntries_, fileCount_, group));
if (!bin->offer(flow))
return false;
queue->push_back(std::move(bin));
logger_->log_debug("BinManager add bin %s to group %s", queue->back()->getUUIDStr(), group);
groupBinMap_.insert(std::make_pair(group, std::move(queue)));
binCount_++;
}
return true;
}
void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
if (flow != nullptr) {
preprocessFlowFile(context.get(), session.get(), flow);
std::string groupId = getGroupId(context.get(), flow);
bool offer = this->binManager_.offer(groupId, flow);
if (!offer) {
session->transfer(flow, Failure);
context->yield();
return;
}
// remove the flowfile from the process session, it add to merge session later.
session->remove(flow);
}
// migrate bin to ready bin
this->binManager_.gatherReadyBins();
if (this->binManager_.getBinCount() > maxBinCount_) {
// bin count reach max allowed
context->yield();
logger_->log_debug("BinFiles reach max bin count %d", this->binManager_.getBinCount());
this->binManager_.removeOldestBin();
}
// get the ready bin
std::deque<std::unique_ptr<Bin>> readyBins;
binManager_.getReadyBin(readyBins);
// process the ready bin
if (!readyBins.empty()) {
// create session for merge
core::ProcessSession mergeSession(context);
while (!readyBins.empty()) {
std::unique_ptr<Bin> bin = std::move(readyBins.front());
readyBins.pop_front();
// add bin's flows to the session
this->addFlowsToSession(context.get(), &mergeSession, bin);
logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
if (!this->processBin(context.get(), &mergeSession, bin))
this->transferFlowsToFail(context.get(), &mergeSession, bin);
}
mergeSession.commit();
}
}
void BinFiles::transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (auto flow : flows) {
session->transfer(flow, Failure);
}
flows.clear();
}
void BinFiles::addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin) {
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (auto flow : flows) {
session->add(flow);
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */