blob: 1aad28e5e1adf383991bb2a15adf3a3066676a56 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <iostream>
#include <filesystem>
#include <algorithm>
#include "unit/TestBase.h"
#include "integration/HTTPIntegrationBase.h"
#include "integration/HTTPHandlers.h"
#include "processors/TailFile.h"
#include "state/ProcessorController.h"
#include "utils/file/FileUtils.h"
#include "unit/TestUtils.h"
#include "processors/GetTCP.h"
#include "utils/StringUtils.h"
#include "utils/file/PathUtils.h"
#include "unit/Catch.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
class VerifyC2Metrics : public VerifyC2Base {
public:
explicit VerifyC2Metrics(const std::atomic_bool& metrics_updated_successfully) : metrics_updated_successfully_(metrics_updated_successfully) {
}
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setOff<minifi::processors::GetTCP>();
VerifyC2Base::testSetup();
}
void runAssertions() override {
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
REQUIRE(verifyEventHappenedInPollTime(40s, [&] { return metrics_updated_successfully_.load(); }, 1s));
}
private:
const std::atomic_bool& metrics_updated_successfully_;
};
class MetricsHandler: public HeartbeatHandler {
public:
explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& replacement_config_path)
: HeartbeatHandler(std::move(configuration)),
metrics_updated_successfully_(metrics_updated_successfully),
replacement_config_(minifi::utils::file::get_content(replacement_config_path.string())) {
}
void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override {
switch (test_state_) {
case TestState::VERIFY_INITIAL_METRICS: {
verifyMetrics(root);
sendEmptyHeartbeatResponse(conn);
break;
}
case TestState::SEND_NEW_CONFIG: {
sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, {{"configuration_data", minifi::c2::C2Value{replacement_config_}}});
test_state_ = TestState::VERIFY_UPDATED_METRICS;
break;
}
case TestState::VERIFY_UPDATED_METRICS: {
verifyUpdatedMetrics(root);
sendEmptyHeartbeatResponse(conn);
break;
}
}
}
private:
enum class TestState {
VERIFY_INITIAL_METRICS,
SEND_NEW_CONFIG,
VERIFY_UPDATED_METRICS
};
static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991";
static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992";
static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990";
static constexpr const char* GENERATE_FLOWFILE_UUID = "4fe2d51d-076a-49b0-88de-5cf5adf52b8f";
static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
}
void verifyMetrics(const rapidjson::Document& root) {
auto initial_metrics_verified =
root.HasMember("metrics") &&
root["metrics"].HasMember("RuntimeMetrics") &&
root["metrics"].HasMember("LoadMetrics") &&
root["metrics"].HasMember("ProcessorMetrics") &&
verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) &&
verifyLoadMetrics(root["metrics"]["LoadMetrics"]) &&
verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"]);
if (initial_metrics_verified) {
test_state_ = TestState::SEND_NEW_CONFIG;
}
}
void verifyUpdatedMetrics(const rapidjson::Document& root) {
auto updated_metrics_verified =
root.HasMember("metrics") &&
root["metrics"].HasMember("RuntimeMetrics") &&
root["metrics"].HasMember("LoadMetrics") &&
!root["metrics"].HasMember("ProcessorMetrics") &&
verifyUpdatedRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) &&
verifyUpdatedLoadMetrics(root["metrics"]["LoadMetrics"]);
if (updated_metrics_verified) {
metrics_updated_successfully_ = true;
}
}
static bool processorMetricsAreValid(const auto& processor) {
return processor["bytesRead"].GetInt() >= 0 &&
processor["bytesWritten"].GetInt() >= 0 &&
processor["flowFilesIn"].GetInt() >= 0 &&
processor["flowFilesOut"].GetInt() >= 0 &&
processor["bytesIn"].GetInt() >= 0 &&
processor["bytesOut"].GetInt() >= 0 &&
processor["invocations"].GetInt() >= 0 &&
processor["processingNanos"].GetInt() >= 0 &&
processor["activeThreadCount"].GetInt() == -1 &&
processor["terminatedThreadCount"].GetInt() == -1 &&
processor["runStatus"].GetString() == std::string("RUNNING");
}
static bool verifyCommonRuntimeMetricNodes(const rapidjson::Value& runtime_metrics, const std::string& queue_id) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") &&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("flowId") &&
runtime_metrics["flowInfo"].HasMember("runStatus") &&
runtime_metrics["flowInfo"]["runStatus"].GetString() == std::string("RUNNING") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"]["queues"].HasMember(queue_id) &&
runtime_metrics.HasMember("agentInfo") &&
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses");
}
static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") &&
[&]() {
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
if (processor_statuses.Size() != 2) {
return false;
}
return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) {
if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString());
}
return processorMetricsAreValid(processor);
});
}();
}
static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return verifyCommonRuntimeMetricNodes(runtime_metrics, "8368e3c8-015a-1003-52ca-83af40ec1332") &&
runtime_metrics["flowInfo"].HasMember("processorStatuses") &&
[&]() {
const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
if (processor_statuses.Size() != 2) {
return false;
}
return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) {
if (processor["id"].GetString() != std::string(GENERATE_FLOWFILE_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE2_UUID)) {
throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString());
}
return processorMetricsAreValid(processor);
});
}();
}
static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
return load_metrics.HasMember("RepositoryMetrics") &&
load_metrics.HasMember("QueueMetrics") &&
load_metrics["RepositoryMetrics"].HasMember("ff") &&
load_metrics["RepositoryMetrics"].HasMember("repo_name") &&
load_metrics["QueueMetrics"].HasMember("GetTCP/success/LogAttribute");
}
static bool verifyUpdatedLoadMetrics(const rapidjson::Value& load_metrics) {
return load_metrics.HasMember("RepositoryMetrics") &&
load_metrics.HasMember("QueueMetrics") &&
load_metrics["RepositoryMetrics"].HasMember("ff") &&
load_metrics["RepositoryMetrics"].HasMember("repo_name") &&
load_metrics["QueueMetrics"].HasMember("GenerateFlowFile/success/LogAttribute") &&
std::stoi(load_metrics["QueueMetrics"]["GenerateFlowFile/success/LogAttribute"]["queued"].GetString()) > 0;
}
static bool verifyProcessorMetrics(const rapidjson::Value& processor_metrics) {
return processor_metrics.HasMember("GetTCPMetrics") &&
processor_metrics["GetTCPMetrics"].HasMember(GETTCP_UUID) &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("OnTriggerInvocations") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID]["OnTriggerInvocations"].GetUint() > 0 &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredFlowFiles") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("AverageOnTriggerRunTime") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("LastOnTriggerRunTime") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("TransferredBytes") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingFlowFiles") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("IncomingBytes") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesRead") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("BytesWritten") &&
processor_metrics["GetTCPMetrics"][GETTCP_UUID].HasMember("ProcessingNanos");
}
std::atomic_bool& metrics_updated_successfully_;
TestState test_state_ = TestState::VERIFY_INITIAL_METRICS;
std::string replacement_config_;
};
TEST_CASE("C2MetricsTest", "[c2test]") {
std::atomic_bool metrics_updated_successfully{false};
VerifyC2Metrics harness(metrics_updated_successfully);
harness.getConfiguration()->set("nifi.c2.root.classes", "FlowInformation,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "runtimemetrics,loadmetrics,processorMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name", "RuntimeMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation,AssetInformation,DeviceInfoNode,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "processorMetrics/GetTCP.*");
const auto test_file_path = std::filesystem::path(TEST_RESOURCES) / "TestC2Metrics.yml";
auto replacement_path = test_file_path.string();
minifi::utils::string::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
MetricsHandler handler(metrics_updated_successfully, harness.getConfiguration(), replacement_path);
harness.setUrl("https://localhost:0/api/heartbeat", &handler);
harness.run(test_file_path);
}
} // namespace org::apache::nifi::minifi::test