blob: ca910f96e1d541b50ef363ac2449871776524c2b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RootProcessGroupWrapper.h"
namespace org::apache::nifi::minifi {
void RootProcessGroupWrapper::updatePropertyValue(const std::string& processor_name, const std::string& property_name, const std::string& property_value) {
if (root_ != nullptr) {
root_->updatePropertyValue(processor_name, property_name, property_value);
}
}
std::string RootProcessGroupWrapper::getName() const {
if (root_ != nullptr) {
return root_->getName();
}
return "";
}
utils::Identifier RootProcessGroupWrapper::getComponentUUID() const {
if (!root_) {
return {};
}
return root_->getUUID();
}
std::string RootProcessGroupWrapper::getVersion() const {
if (root_ != nullptr) {
return std::to_string(root_->getVersion());
}
return "0";
}
void RootProcessGroupWrapper::setNewRoot(std::unique_ptr<core::ProcessGroup> new_root) {
if (!new_root) {
logger_->log_error("New flow to be set was empty!");
return;
}
if (metrics_publisher_store_) { metrics_publisher_store_->clearMetricNodes(); }
if (controller_socket_protocol_) {
controller_socket_protocol_->setRoot(nullptr);
}
backup_root_ = std::move(root_);
root_ = std::move(new_root);
processor_to_controller_.clear();
if (metrics_publisher_store_) { metrics_publisher_store_->loadMetricNodes(root_.get()); }
if (controller_socket_protocol_) {
controller_socket_protocol_->setRoot(root_.get());
}
}
void RootProcessGroupWrapper::restoreBackup() {
if (metrics_publisher_store_) { metrics_publisher_store_->clearMetricNodes(); }
root_ = std::move(backup_root_);
processor_to_controller_.clear();
if (metrics_publisher_store_) { metrics_publisher_store_->loadMetricNodes(root_.get()); }
}
void RootProcessGroupWrapper::clearBackup() {
backup_root_ = nullptr;
}
void RootProcessGroupWrapper::stopProcessing(TimerDrivenSchedulingAgent& timer_scheduler,
EventDrivenSchedulingAgent& event_scheduler,
CronDrivenSchedulingAgent& cron_scheduler) {
if (!root_) {
return;
}
// stop source processors first
root_->stopProcessing(timer_scheduler, event_scheduler, cron_scheduler, [] (const core::Processor* proc) -> bool {
return !proc->hasIncomingConnections();
});
// we enable C2 to progressively increase the timeout
// in case it sees that waiting for a little longer could
// allow the FlowFiles to be processed
auto shutdown_start = std::chrono::steady_clock::now();
while ((std::chrono::steady_clock::now() - shutdown_start) < loadShutdownTimeoutFromConfiguration().value_or(std::chrono::milliseconds{0}) &&
root_->getTotalFlowFileCount() != 0) {
std::this_thread::sleep_for(shutdown_check_interval_);
}
// shutdown all other processors as well
root_->stopProcessing(timer_scheduler, event_scheduler, cron_scheduler);
}
void RootProcessGroupWrapper::drainConnections() {
if (root_) {
root_->drainConnections();
}
}
void RootProcessGroupWrapper::getConnections(std::map<std::string, core::Connectable*>& connectionMap) {
if (root_) {
root_->getConnections(connectionMap);
}
}
void RootProcessGroupWrapper::getFlowFileContainers(std::map<std::string, core::Connectable*>& containers) const {
if (root_) {
root_->getFlowFileContainers(containers);
}
}
bool RootProcessGroupWrapper::startProcessing(TimerDrivenSchedulingAgent& timer_scheduler,
EventDrivenSchedulingAgent& event_scheduler,
CronDrivenSchedulingAgent& cron_scheduler) {
if (!root_) {
return false;
}
root_->startProcessing(timer_scheduler, event_scheduler, cron_scheduler);
return true;
}
void RootProcessGroupWrapper::clearConnection(const std::string &connection) {
if (root_ == nullptr) {
return;
}
logger_->log_info("Attempting to clear connection {}", connection);
std::map<std::string, Connection*> connections;
root_->getConnections(connections);
auto conn = connections.find(connection);
if (conn != connections.end()) {
logger_->log_info("Clearing connection {}", connection);
conn->second->drain(true);
}
}
std::optional<std::vector<state::StateController*>> RootProcessGroupWrapper::getAllProcessorControllers(
const std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>& controllerFactory) {
if (!root_) {
return std::nullopt;
}
std::vector<state::StateController*> controllerVec;
std::vector<core::Processor*> processorVec;
root_->getAllProcessors(processorVec);
for (const auto& processor : processorVec) {
// reference to the existing or newly created controller
auto& controller = processor_to_controller_[processor->getUUID()];
if (!controller) {
controller = controllerFactory(*processor);
}
controllerVec.push_back(controller.get());
}
return controllerVec;
}
state::StateController* RootProcessGroupWrapper::getProcessorController(const std::string& id_or_name,
const std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>& controllerFactory) {
if (!root_) {
return nullptr;
}
return utils::Identifier::parse(id_or_name)
| utils::andThen([this](utils::Identifier id) { return utils::optional_from_ptr(root_->findProcessorById(id)); })
| utils::orElse([this, &id_or_name] { return utils::optional_from_ptr(root_->findProcessorByName(id_or_name)); })
| utils::transform([this, &controllerFactory](gsl::not_null<core::Processor*> proc) -> gsl::not_null<state::ProcessorController*> {
return utils::optional_from_ptr(processor_to_controller_[proc->getUUID()].get())
| utils::valueOrElse([this, proc, &controllerFactory] {
return gsl::make_not_null((processor_to_controller_[proc->getUUID()] = controllerFactory(*proc)).get());
});
})
| utils::valueOrElse([this, &id_or_name]() -> state::ProcessorController* {
logger_->log_error("Could not get processor controller for requested id/name \"{}\", because the processor was not found", id_or_name);
return nullptr;
});
}
std::optional<std::chrono::milliseconds> RootProcessGroupWrapper::loadShutdownTimeoutFromConfiguration() {
return configuration_->get(Configure::nifi_flowcontroller_drain_timeout) | utils::andThen([] (const auto& s) { return parsing::parseDuration(s) | utils::toOptional(); });
}
} // namespace org::apache::nifi::minifi