blob: 322f5a57af71f11944f12a69279f6bd09669dbf5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include <set>
#include <cinttypes>
#include "core/Repository.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "core/FlowFile.h"
#include "utils/requirements/Container.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {
std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator::getIdGenerator();
std::shared_ptr<utils::NonRepeatingStringGenerator> FlowFile::numeric_id_generator_ = std::make_shared<utils::NonRepeatingStringGenerator>();
std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger();
FlowFile::FlowFile()
: CoreComponent("FlowFile"),
size_(0),
stored(false),
offset_(0),
last_queue_date_(0),
penaltyExpiration_ms_(0),
event_time_(0),
claim_(nullptr),
marked_delete_(false) {
id_ = numeric_id_generator_->generateId();
entry_date_ = utils::timeutils::getTimeMillis();
event_time_ = entry_date_;
lineage_start_date_ = entry_date_;
}
FlowFile& FlowFile::operator=(const FlowFile& other) {
uuid_ = other.uuid_;
stored = other.stored;
marked_delete_ = other.marked_delete_;
entry_date_ = other.entry_date_;
lineage_start_date_ = other.lineage_start_date_;
lineage_Identifiers_ = other.lineage_Identifiers_;
last_queue_date_ = other.last_queue_date_;
size_ = other.size_;
penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
attributes_ = other.attributes_;
claim_ = other.claim_;
connection_ = other.connection_;
return *this;
}
/**
* Returns whether or not this flow file record
* is marked as deleted.
* @return marked deleted
*/
bool FlowFile::isDeleted() const {
return marked_delete_;
}
/**
* Sets whether to mark this flow file record
* as deleted
* @param deleted deleted flag
*/
void FlowFile::setDeleted(const bool deleted) {
marked_delete_ = deleted;
if (marked_delete_) {
removeReferences();
}
}
std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
return claim_;
}
void FlowFile::clearResourceClaim() {
claim_ = nullptr;
}
void FlowFile::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
claim_ = claim;
}
std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string& key) {
return stashedContent_[key];
}
void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
if (hasStashClaim(key)) {
logger_->log_warn("Stashing content of record %s to existing key %s; "
"existing content will be overwritten",
getUUIDStr(), key.c_str());
}
stashedContent_[key] = claim;
}
void FlowFile::clearStashClaim(const std::string& key) {
auto claimIt = stashedContent_.find(key);
if (claimIt != stashedContent_.end()) {
claimIt->second = nullptr;
stashedContent_.erase(claimIt);
}
}
bool FlowFile::hasStashClaim(const std::string& key) {
return stashedContent_.find(key) != stashedContent_.end();
}
// ! Get Entry Date
uint64_t FlowFile::getEntryDate() const {
return entry_date_;
}
uint64_t FlowFile::getEventTime() const {
return event_time_;
}
// ! Get Lineage Start Date
uint64_t FlowFile::getlineageStartDate() const {
return lineage_start_date_;
}
std::vector<utils::Identifier> &FlowFile::getlineageIdentifiers() {
return lineage_Identifiers_;
}
bool FlowFile::getAttribute(std::string key, std::string& value) const {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
value = it->second;
return true;
} else {
return false;
}
}
// Get Size
uint64_t FlowFile::getSize() const {
return size_;
}
// ! Get Offset
uint64_t FlowFile::getOffset() const {
return offset_;
}
bool FlowFile::removeAttribute(const std::string key) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
attributes_.erase(it);
return true;
} else {
return false;
}
}
bool FlowFile::updateAttribute(const std::string key, const std::string value) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
it->second = value;
return true;
} else {
return false;
}
}
bool FlowFile::addAttribute(const std::string& key, const std::string& value) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
// attribute already there in the map
return false;
} else {
attributes_[key] = value;
return true;
}
}
void FlowFile::setLineageStartDate(const uint64_t date) {
lineage_start_date_ = date;
}
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void FlowFile::setConnection(const std::shared_ptr<core::Connectable>& connection) {
connection_ = connection;
}
/**
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
*/
std::shared_ptr<core::Connectable> FlowFile::getConnection() const {
return connection_;
}
const std::string SpecialFlowAttribute::PATH = "path";
const std::string SpecialFlowAttribute::ABSOLUTE_PATH = "absolute.path";
const std::string SpecialFlowAttribute::FILENAME = "filename";
const std::string SpecialFlowAttribute::UUID = "uuid";
const std::string SpecialFlowAttribute::priority = "priority";
const std::string SpecialFlowAttribute::MIME_TYPE = "mime.type";
const std::string SpecialFlowAttribute::DISCARD_REASON = "discard.reason";
const std::string SpecialFlowAttribute::ALTERNATE_IDENTIFIER = "alternate.identifier";
const std::string SpecialFlowAttribute::FLOW_ID = "flow.id";
} /* namespace core */
namespace utils {
template struct assert_container<core::FlowFile::AttributeMap>;
} /* namespace utils */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */