|  | /** | 
|  | * | 
|  | * Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | * contributor license agreements.  See the NOTICE file distributed with | 
|  | * this work for additional information regarding copyright ownership. | 
|  | * The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | * (the "License"); you may not use this file except in compliance with | 
|  | * the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <set> | 
|  | #include <cinttypes> | 
|  | #include "utils/Id.h" | 
|  | #include "core/FlowFile.h" | 
|  | #include "utils/requirements/Container.h" | 
|  | #include "core/logging/LoggerFactory.h" | 
|  |  | 
|  | namespace org::apache::nifi::minifi { | 
|  | namespace core { | 
|  |  | 
|  | std::shared_ptr<utils::IdGenerator> FlowFileImpl::id_generator_ = utils::IdGenerator::getIdGenerator(); | 
|  | std::shared_ptr<utils::NonRepeatingStringGenerator> FlowFileImpl::numeric_id_generator_ = std::make_shared<utils::NonRepeatingStringGenerator>(); | 
|  | std::shared_ptr<logging::Logger> FlowFileImpl::logger_ = logging::LoggerFactory<FlowFile>::getLogger(); | 
|  |  | 
|  | FlowFileImpl::FlowFileImpl() | 
|  | : CoreComponentImpl("FlowFile"), | 
|  | stored(false), | 
|  | marked_delete_(false), | 
|  | entry_date_(std::chrono::system_clock::now()), | 
|  | event_time_(entry_date_), | 
|  | lineage_start_date_(entry_date_), | 
|  | last_queue_date_(0), | 
|  | size_(0), | 
|  | id_(numeric_id_generator_->generateId()), | 
|  | offset_(0), | 
|  | to_be_processed_after_(std::chrono::steady_clock::now()) { | 
|  | } | 
|  |  | 
|  | FlowFileImpl& FlowFileImpl::operator=(const FlowFileImpl& other) { | 
|  | if (this == &other) { | 
|  | return *this; | 
|  | } | 
|  | uuid_ = other.uuid_; | 
|  | stored = other.stored; | 
|  | marked_delete_ = other.marked_delete_; | 
|  | entry_date_ = other.entry_date_; | 
|  | lineage_start_date_ = other.lineage_start_date_; | 
|  | lineage_Identifiers_ = other.lineage_Identifiers_; | 
|  | last_queue_date_ = other.last_queue_date_; | 
|  | size_ = other.size_; | 
|  | to_be_processed_after_ = other.to_be_processed_after_; | 
|  | attributes_ = other.attributes_; | 
|  | claim_ = other.claim_; | 
|  | connection_ = other.connection_; | 
|  | return *this; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Returns whether or not this flow file record | 
|  | * is marked as deleted. | 
|  | * @return marked deleted | 
|  | */ | 
|  | bool FlowFileImpl::isDeleted() const { | 
|  | return marked_delete_; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets whether to mark this flow file record | 
|  | * as deleted | 
|  | * @param deleted deleted flag | 
|  | */ | 
|  | void FlowFileImpl::setDeleted(const bool deleted) { | 
|  | marked_delete_ = deleted; | 
|  | if (marked_delete_) { | 
|  | removeReferences(); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::shared_ptr<ResourceClaim> FlowFileImpl::getResourceClaim() const { | 
|  | return claim_; | 
|  | } | 
|  |  | 
|  | void FlowFileImpl::clearResourceClaim() { | 
|  | claim_ = nullptr; | 
|  | } | 
|  | void FlowFileImpl::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) { | 
|  | claim_ = claim; | 
|  | } | 
|  |  | 
|  | std::shared_ptr<ResourceClaim> FlowFileImpl::getStashClaim(const std::string& key) { | 
|  | return stashedContent_[key]; | 
|  | } | 
|  |  | 
|  | void FlowFileImpl::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) { | 
|  | if (hasStashClaim(key)) { | 
|  | logger_->log_warn("Stashing content of record {} to existing key {}; " | 
|  | "existing content will be overwritten", | 
|  | getUUIDStr(), key.c_str()); | 
|  | } | 
|  |  | 
|  | stashedContent_[key] = claim; | 
|  | } | 
|  |  | 
|  | void FlowFileImpl::clearStashClaim(const std::string& key) { | 
|  | auto claimIt = stashedContent_.find(key); | 
|  | if (claimIt != stashedContent_.end()) { | 
|  | claimIt->second = nullptr; | 
|  | stashedContent_.erase(claimIt); | 
|  | } | 
|  | } | 
|  |  | 
|  | bool FlowFileImpl::hasStashClaim(const std::string& key) { | 
|  | return stashedContent_.find(key) != stashedContent_.end(); | 
|  | } | 
|  |  | 
|  | // ! Get Entry Date | 
|  | std::chrono::system_clock::time_point FlowFileImpl::getEntryDate() const { | 
|  | return entry_date_; | 
|  | } | 
|  | std::chrono::system_clock::time_point FlowFileImpl::getEventTime() const { | 
|  | return event_time_; | 
|  | } | 
|  | // ! Get Lineage Start Date | 
|  | std::chrono::system_clock::time_point FlowFileImpl::getlineageStartDate() const { | 
|  | return lineage_start_date_; | 
|  | } | 
|  |  | 
|  | const std::vector<utils::Identifier>& FlowFileImpl::getlineageIdentifiers() const { | 
|  | return lineage_Identifiers_; | 
|  | } | 
|  |  | 
|  | std::vector<utils::Identifier>& FlowFileImpl::getlineageIdentifiers() { | 
|  | return lineage_Identifiers_; | 
|  | } | 
|  |  | 
|  | bool FlowFileImpl::getAttribute(std::string_view key, std::string& value) const { | 
|  | const auto attribute = getAttribute(key); | 
|  | if (!attribute) { | 
|  | return false; | 
|  | } | 
|  | value = attribute.value(); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | std::optional<std::string> FlowFileImpl::getAttribute(std::string_view key) const { | 
|  | auto it = attributes_.find(key); | 
|  | if (it != attributes_.end()) { | 
|  | return it->second; | 
|  | } | 
|  | return std::nullopt; | 
|  | } | 
|  |  | 
|  | // Get Size | 
|  | uint64_t FlowFileImpl::getSize() const { | 
|  | return size_; | 
|  | } | 
|  | // ! Get Offset | 
|  | uint64_t FlowFileImpl::getOffset() const { | 
|  | return offset_; | 
|  | } | 
|  |  | 
|  | bool FlowFileImpl::removeAttribute(std::string_view key) { | 
|  | auto it = attributes_.find(key); | 
|  | if (it != attributes_.end()) { | 
|  | attributes_.erase(it); | 
|  | return true; | 
|  | } else { | 
|  | return false; | 
|  | } | 
|  | } | 
|  |  | 
|  | bool FlowFileImpl::updateAttribute(std::string_view key, const std::string& value) { | 
|  | auto it = attributes_.find(key); | 
|  | if (it != attributes_.end()) { | 
|  | it->second = value; | 
|  | return true; | 
|  | } else { | 
|  | return false; | 
|  | } | 
|  | } | 
|  |  | 
|  | bool FlowFileImpl::addAttribute(std::string_view key, const std::string& value) { | 
|  | auto it = attributes_.find(key); | 
|  | if (it != attributes_.end()) { | 
|  | // attribute already there in the map | 
|  | return false; | 
|  | } else { | 
|  | attributes_[key] = value; | 
|  | return true; | 
|  | } | 
|  | } | 
|  |  | 
|  | void FlowFileImpl::setLineageStartDate(const std::chrono::system_clock::time_point date) { | 
|  | lineage_start_date_ = date; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the original connection with a shared pointer. | 
|  | * @param connection shared connection. | 
|  | */ | 
|  | void FlowFileImpl::setConnection(core::Connectable* connection) { | 
|  | connection_ = connection; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Returns the original connection referenced by this record. | 
|  | * @return shared original connection pointer. | 
|  | */ | 
|  | core::Connectable* FlowFileImpl::getConnection() const { | 
|  | return connection_; | 
|  | } | 
|  |  | 
|  | } /* namespace core */ | 
|  |  | 
|  | namespace utils { | 
|  | template struct assert_container<core::FlowFile::AttributeMap>; | 
|  | } /* namespace utils */ | 
|  |  | 
|  | }  // namespace org::apache::nifi::minifi |