blob: fe2e3db77bfa6548d13f5317393a059722f0dd40 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include <memory>
#include <map>
#include "c2/C2MetricsPublisher.h"
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
#include "core/state/nodes/QueueMetrics.h"
#include "core/state/nodes/AgentInformation.h"
#include "core/state/nodes/ConfigurationChecksums.h"
#include "core/state/nodes/RepositoryMetrics.h"
#include "properties/Configure.h"
#include "core/state/UpdateController.h"
#include "c2/C2Agent.h"
#include "core/state/nodes/FlowInformation.h"
#include "properties/Configuration.h"
#include "utils/file/FileSystem.h"
#include "utils/file/FileUtils.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/StringUtils.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::c2 {
void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, const state::response::SharedResponseNode& new_node) {
gsl_Expects(response_node_loader_);
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
}
for (const auto& response_node : response_nodes) {
dynamic_cast<state::response::ObjectNode*>(new_node.get())->add_node(response_node);
}
}
}
void C2MetricsPublisher::loadC2ResponseConfiguration(const std::string &prefix) {
gsl_Expects(configuration_);
std::string class_definitions;
if (!configuration_->get(prefix, class_definitions)) {
return;
}
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
std::string nameOption = option + ".name";
std::string name;
if (!configuration_->get(nameOption, name)) {
continue;
}
state::response::SharedResponseNode new_node = gsl::make_not_null(std::make_shared<state::response::ObjectNode>(name));
if (configuration_->get(classOption, class_definitions)) {
loadNodeClasses(class_definitions, new_node);
} else {
std::string optionName = utils::string::join_pack(option, ".", name);
loadC2ResponseConfiguration(optionName, new_node);
}
// We don't need to lock here, we already do it in the loadMetricNodes member function
root_response_nodes_[name].push_back(new_node);
} catch (const std::exception& ex) {
logger_->log_error("Could not create metrics class {}, exception type: {}, what: {}", metricsClass, typeid(ex).name(), ex.what());
} catch (...) {
logger_->log_error("Could not create metrics class {}, exception type: {}", metricsClass, getCurrentExceptionTypeName());
}
}
}
state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfiguration(const std::string &prefix,
state::response::SharedResponseNode prev_node) {
gsl_Expects(configuration_);
std::string class_definitions;
if (!configuration_->get(prefix, class_definitions)) {
return prev_node;
}
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
std::string nameOption = option + ".name";
std::string name;
if (!configuration_->get(nameOption, name)) {
continue;
}
state::response::SharedResponseNode new_node = gsl::make_not_null(std::make_shared<state::response::ObjectNode>(name));
if (name.find(',') != std::string::npos) {
auto sub_classes = utils::string::splitAndTrimRemovingEmpty(name, ",");
std::unordered_set<std::string> unique_sub_classes{sub_classes.begin(), sub_classes.end()};
for (const std::string& subClassStr : unique_sub_classes) {
auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
dynamic_cast<state::response::ObjectNode*>(prev_node.get())->add_node(node);
}
} else {
if (configuration_->get(classOption, class_definitions)) {
loadNodeClasses(class_definitions, new_node);
if (!new_node->isEmpty()) {
dynamic_cast<state::response::ObjectNode*>(prev_node.get())->add_node(new_node);
}
} else {
std::string optionName = utils::string::join_pack(option, ".", name);
auto sub_node = loadC2ResponseConfiguration(optionName, new_node);
dynamic_cast<state::response::ObjectNode*>(prev_node.get())->add_node(sub_node);
}
}
} catch (const std::exception& ex) {
logger_->log_error("Could not create metrics class {}, exception type: {}, what: {}", metricsClass, typeid(ex).name(), ex.what());
} catch (...) {
logger_->log_error("Could not create metrics class {}, exception type: {}", metricsClass, getCurrentExceptionTypeName());
}
}
return prev_node;
}
std::optional<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::getMetricsNode(const std::string& metrics_class) const {
gsl_Expects(response_node_loader_);
std::lock_guard<std::mutex> guard{metrics_mutex_};
const auto createReportedNode = [](const std::vector<state::response::SharedResponseNode>& nodes) {
gsl_Expects(!nodes.empty());
state::response::NodeReporter::ReportedNode reported_node;
reported_node.is_array = nodes[0]->isArray();
reported_node.name = nodes[0]->getName();
reported_node.serialized_nodes = state::response::ResponseNode::serializeAndMergeResponseNodes(nodes);
return reported_node;
};
if (!metrics_class.empty()) {
auto metrics_nodes = response_node_loader_->loadResponseNodes(metrics_class);
if (!metrics_nodes.empty()) {
return createReportedNode(metrics_nodes);
}
} else {
const auto metrics_it = root_response_nodes_.find("metrics");
if (metrics_it != root_response_nodes_.end()) {
return createReportedNode(metrics_it->second);
}
}
return std::nullopt;
}
std::vector<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::getHeartbeatNodes(bool include_manifest) const {
gsl_Expects(configuration_);
bool full_heartbeat = configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat)
| utils::andThen(utils::string::toBool)
| utils::valueOrElse([] {return true;});
bool include = include_manifest || full_heartbeat;
std::vector<state::response::NodeReporter::ReportedNode> reported_nodes;
std::lock_guard<std::mutex> guard{metrics_mutex_};
reported_nodes.reserve(root_response_nodes_.size());
for (const auto& [name, node_values] : root_response_nodes_) {
for (const auto& node : node_values) {
auto identifier = dynamic_cast<state::response::AgentIdentifier*>(node.get());
if (identifier) {
identifier->includeAgentManifest(include);
}
state::response::NodeReporter::ReportedNode reported_node;
reported_node.name = node->getName();
reported_node.is_array = node->isArray();
reported_node.serialized_nodes = node->serialize();
reported_nodes.push_back(reported_node);
}
}
return reported_nodes;
}
void C2MetricsPublisher::loadMetricNodes() {
gsl_Expects(response_node_loader_ && configuration_);
if (!root_response_nodes_.empty()) {
return;
}
std::string class_csv;
std::lock_guard<std::mutex> guard{metrics_mutex_};
if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) {
auto classes = utils::string::splitAndTrimRemovingEmpty(class_csv, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
}
for (auto&& response_node: response_nodes) {
root_response_nodes_[response_node->getName()].push_back(std::move(response_node));
}
}
}
loadC2ResponseConfiguration(Configuration::nifi_c2_root_class_definitions);
}
void C2MetricsPublisher::clearMetricNodes() {
std::lock_guard<std::mutex> guard{metrics_mutex_};
root_response_nodes_.clear();
}
state::response::NodeReporter::ReportedNode C2MetricsPublisher::getAgentManifest() {
gsl_Expects(response_node_loader_);
return response_node_loader_->getAgentManifest();
}
REGISTER_RESOURCE(C2MetricsPublisher, DescriptionOnly);
} // namespace org::apache::nifi::minifi::c2