blob: 423713236f0fe26179fd92156d16557b91c6207f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/Connectable.h"
#include <utility>
#include <memory>
#include <string>
#include <set>
#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::core {
ConnectableImpl::ConnectableImpl(const std::string_view name, const utils::Identifier &uuid)
: CoreComponentImpl(name, uuid),
max_concurrent_tasks_(1),
connectable_version_(nullptr),
logger_(logging::LoggerFactory<Connectable>::getLogger(uuid_)) {
}
ConnectableImpl::ConnectableImpl(const std::string_view name)
: CoreComponentImpl(name),
max_concurrent_tasks_(1),
connectable_version_(nullptr),
logger_(logging::LoggerFactory<Connectable>::getLogger(uuid_)) {
}
ConnectableImpl::~ConnectableImpl() = default;
void ConnectableImpl::setSupportedRelationships(std::span<const core::RelationshipDefinition> relationships) {
if (isRunning()) {
logger_->log_warn("Cannot set processor supported relationship while the process {} is running", name_);
return;
}
std::lock_guard<std::mutex> lock(relationship_mutex_);
relationships_.clear();
for (const auto& item : relationships) {
relationships_.emplace(item.name, item);
logger_->log_debug("Processor {} supported relationship name {}", name_, item.name);
}
}
std::vector<Relationship> ConnectableImpl::getSupportedRelationships() const {
std::vector<Relationship> relationships;
relationships.reserve(relationships_.size());
for (auto const &item : relationships_) {
relationships.push_back(item.second);
}
return relationships;
}
bool ConnectableImpl::isSupportedRelationship(const core::Relationship &relationship) {
// if we are running we do not need a lock since the function to change relationships_ ( setSupportedRelationships)
// cannot be executed while we are running
const bool isConnectableRunning = isRunning();
const auto conditionalLock = isConnectableRunning ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
return relationships_.contains(relationship.getName());
}
void ConnectableImpl::addAutoTerminatedRelationship(const core::Relationship& relationship) {
if (isRunning()) {
logger_->log_warn("Can not add processor auto terminated relationship while the process {} is running", name_);
return;
}
std::lock_guard<std::mutex> lock(relationship_mutex_);
auto_terminated_relationships_[relationship.getName()] = relationship;
}
void ConnectableImpl::setAutoTerminatedRelationships(std::span<const core::Relationship> relationships) {
if (isRunning()) {
logger_->log_warn("Can not set processor auto terminated relationship while the process {} is running", name_);
return;
}
std::lock_guard<std::mutex> lock(relationship_mutex_);
auto_terminated_relationships_.clear();
for (const auto& item : relationships) {
auto_terminated_relationships_[item.getName()] = item;
logger_->log_debug("Processor {} auto terminated relationship name {}", name_, item.getName());
}
}
bool ConnectableImpl::isAutoTerminated(const core::Relationship &relationship) {
// if we are running we do not need a lock since the function to change relationships_ ( setSupportedRelationships)
// cannot be executed while we are running
const bool isConnectableRunning = isRunning();
const auto conditionalLock = isConnectableRunning ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_);
return auto_terminated_relationships_.contains(relationship.getName());
}
void ConnectableImpl::waitForWork(std::chrono::milliseconds timeout) {
has_work_.store(isWorkAvailable());
if (!has_work_.load()) {
std::unique_lock<std::mutex> lock(work_available_mutex_);
work_condition_.wait_for(lock, timeout, [&] {return has_work_.load();});
}
}
void ConnectableImpl::notifyWork() {
// Do nothing if we are not event-driven
if (strategy_ != EVENT_DRIVEN) {
return;
}
{
has_work_.store(isWorkAvailable());
if (has_work_.load()) {
work_condition_.notify_one();
}
}
}
std::set<Connectable*> ConnectableImpl::getOutGoingConnections(const std::string &relationship) {
const auto it = outgoing_connections_.find(relationship);
if (it != outgoing_connections_.end()) {
return it->second;
} else if (relationship == "__self__") {
return {this};
} else {
return {};
}
}
Connectable* ConnectableImpl::getNextIncomingConnection() {
std::lock_guard<std::mutex> lock(relationship_mutex_);
return getNextIncomingConnectionImpl(lock);
}
Connectable* ConnectableImpl::getNextIncomingConnectionImpl(const std::lock_guard<std::mutex>& /*relatioship_mutex_lock*/) {
if (incoming_connections_.empty())
return nullptr;
if (incoming_connections_Iter == incoming_connections_.end())
incoming_connections_Iter = incoming_connections_.begin();
auto ret = *incoming_connections_Iter;
incoming_connections_Iter++;
if (incoming_connections_Iter == incoming_connections_.end())
incoming_connections_Iter = incoming_connections_.begin();
return ret;
}
Connectable* ConnectableImpl::pickIncomingConnection() {
return getNextIncomingConnection();
}
} // namespace org::apache::nifi::minifi::core