blob: 7f0819a28bf3e6225624640bc32915e14ae31256 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include <set>
#include <cinttypes>
#include "utils/Id.h"
#include "core/FlowFile.h"
#include "utils/requirements/Container.h"
#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi {
namespace core {
std::shared_ptr<utils::IdGenerator> FlowFileImpl::id_generator_ = utils::IdGenerator::getIdGenerator();
std::shared_ptr<utils::NonRepeatingStringGenerator> FlowFileImpl::numeric_id_generator_ = std::make_shared<utils::NonRepeatingStringGenerator>();
std::shared_ptr<logging::Logger> FlowFileImpl::logger_ = logging::LoggerFactory<FlowFile>::getLogger();
FlowFileImpl::FlowFileImpl()
: CoreComponentImpl("FlowFile"),
stored(false),
marked_delete_(false),
entry_date_(std::chrono::system_clock::now()),
event_time_(entry_date_),
lineage_start_date_(entry_date_),
last_queue_date_(0),
size_(0),
id_(numeric_id_generator_->generateId()),
offset_(0),
to_be_processed_after_(std::chrono::steady_clock::now()) {
}
FlowFileImpl& FlowFileImpl::operator=(const FlowFileImpl& other) {
if (this == &other) {
return *this;
}
uuid_ = other.uuid_;
stored = other.stored;
marked_delete_ = other.marked_delete_;
entry_date_ = other.entry_date_;
lineage_start_date_ = other.lineage_start_date_;
lineage_Identifiers_ = other.lineage_Identifiers_;
last_queue_date_ = other.last_queue_date_;
size_ = other.size_;
to_be_processed_after_ = other.to_be_processed_after_;
attributes_ = other.attributes_;
claim_ = other.claim_;
connection_ = other.connection_;
return *this;
}
/**
* Returns whether or not this flow file record
* is marked as deleted.
* @return marked deleted
*/
bool FlowFileImpl::isDeleted() const {
return marked_delete_;
}
/**
* Sets whether to mark this flow file record
* as deleted
* @param deleted deleted flag
*/
void FlowFileImpl::setDeleted(const bool deleted) {
marked_delete_ = deleted;
if (marked_delete_) {
removeReferences();
}
}
std::shared_ptr<ResourceClaim> FlowFileImpl::getResourceClaim() const {
return claim_;
}
void FlowFileImpl::clearResourceClaim() {
claim_ = nullptr;
}
void FlowFileImpl::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
claim_ = claim;
}
std::shared_ptr<ResourceClaim> FlowFileImpl::getStashClaim(const std::string& key) {
return stashedContent_[key];
}
void FlowFileImpl::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
if (hasStashClaim(key)) {
logger_->log_warn("Stashing content of record {} to existing key {}; "
"existing content will be overwritten",
getUUIDStr(), key.c_str());
}
stashedContent_[key] = claim;
}
void FlowFileImpl::clearStashClaim(const std::string& key) {
auto claimIt = stashedContent_.find(key);
if (claimIt != stashedContent_.end()) {
claimIt->second = nullptr;
stashedContent_.erase(claimIt);
}
}
bool FlowFileImpl::hasStashClaim(const std::string& key) {
return stashedContent_.find(key) != stashedContent_.end();
}
// ! Get Entry Date
std::chrono::system_clock::time_point FlowFileImpl::getEntryDate() const {
return entry_date_;
}
std::chrono::system_clock::time_point FlowFileImpl::getEventTime() const {
return event_time_;
}
// ! Get Lineage Start Date
std::chrono::system_clock::time_point FlowFileImpl::getlineageStartDate() const {
return lineage_start_date_;
}
const std::vector<utils::Identifier>& FlowFileImpl::getlineageIdentifiers() const {
return lineage_Identifiers_;
}
std::vector<utils::Identifier>& FlowFileImpl::getlineageIdentifiers() {
return lineage_Identifiers_;
}
bool FlowFileImpl::getAttribute(std::string_view key, std::string& value) const {
const auto attribute = getAttribute(key);
if (!attribute) {
return false;
}
value = attribute.value();
return true;
}
std::optional<std::string> FlowFileImpl::getAttribute(std::string_view key) const {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
return it->second;
}
return std::nullopt;
}
// Get Size
uint64_t FlowFileImpl::getSize() const {
return size_;
}
// ! Get Offset
uint64_t FlowFileImpl::getOffset() const {
return offset_;
}
bool FlowFileImpl::removeAttribute(std::string_view key) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
attributes_.erase(it);
return true;
} else {
return false;
}
}
bool FlowFileImpl::updateAttribute(std::string_view key, const std::string& value) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
it->second = value;
return true;
} else {
return false;
}
}
bool FlowFileImpl::addAttribute(std::string_view key, const std::string& value) {
auto it = attributes_.find(key);
if (it != attributes_.end()) {
// attribute already there in the map
return false;
} else {
attributes_[key] = value;
return true;
}
}
void FlowFileImpl::setLineageStartDate(const std::chrono::system_clock::time_point date) {
lineage_start_date_ = date;
}
/**
* Sets the original connection with a shared pointer.
* @param connection shared connection.
*/
void FlowFileImpl::setConnection(core::Connectable* connection) {
connection_ = connection;
}
/**
* Returns the original connection referenced by this record.
* @return shared original connection pointer.
*/
core::Connectable* FlowFileImpl::getConnection() const {
return connection_;
}
} /* namespace core */
namespace utils {
template struct assert_container<core::FlowFile::AttributeMap>;
} /* namespace utils */
} // namespace org::apache::nifi::minifi