blob: d9c3b1b9d99b2f96b1d24791aa19c7b77153b924 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include "../../include/core/state/nodes/QueueMetrics.h"
#include "../../include/core/state/nodes/RepositoryMetrics.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "core/Processor.h"
#include "core/ClassLoader.h"
#include "repository/VolatileContentRepository.h"
#include "unit/ProvenanceTestHelper.h"
#include "unit/DummyProcessor.h"
#include "range/v3/algorithm/find_if.hpp"
#include "unit/SingleProcessorTestController.h"
#include "core/ProcessorMetrics.h"
#include "unit/TestUtils.h"
#include "Connection.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
void checkSerializedValue(const std::vector<org::apache::nifi::minifi::state::response::SerializedResponseNode>& children, const std::string& name, const std::string& expected_value) {
auto it = ranges::find_if(children, [&](const auto& child) { return child.name == name; });
REQUIRE(it != children.end());
REQUIRE(expected_value == it->value.to_string());
}
TEST_CASE("QueueMetricsTestNoConnections", "[c2m2]") {
minifi::state::response::QueueMetrics metrics;
REQUIRE("QueueMetrics" == metrics.getName());
REQUIRE(metrics.serialize().empty());
}
TEST_CASE("QueueMetricsTestConnections", "[c2m3]") {
minifi::state::response::QueueMetrics metrics;
REQUIRE("QueueMetrics" == metrics.getName());
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::ConfigureImpl>();
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
auto connection = std::make_unique<minifi::ConnectionImpl>(repo, content_repo, "testconnection");
connection->setBackpressureThresholdDataSize(1024);
connection->setBackpressureThresholdCount(1024);
metrics.updateConnection(connection.get());
auto seialized_metrics = metrics.serialize();
REQUIRE(1 == seialized_metrics.size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("testconnection" == resp.name);
REQUIRE(4 == resp.children.size());
checkSerializedValue(resp.children, "datasize", "0");
checkSerializedValue(resp.children, "datasizemax", "1024");
checkSerializedValue(resp.children, "queued", "0");
checkSerializedValue(resp.children, "queuedmax", "1024");
}
TEST_CASE("RepositorymetricsNoRepo", "[c2m4]") {
minifi::state::response::RepositoryMetrics metrics;
REQUIRE("RepositoryMetrics" == metrics.getName());
REQUIRE(metrics.serialize().empty());
}
TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
minifi::state::response::RepositoryMetrics metrics;
REQUIRE("RepositoryMetrics" == metrics.getName());
std::shared_ptr<TestThreadedRepository> repo;
size_t expected_metric_count{};
SECTION("Non-RocksDB repository") {
repo = std::make_shared<TestThreadedRepository>();
expected_metric_count = 5;
}
SECTION("RocksDB repository") {
repo = std::make_shared<TestRocksDbRepository>();
expected_metric_count = 7;
}
metrics.addRepository(repo);
{
REQUIRE(1 == metrics.serialize().size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
if (expected_metric_count > 5) {
checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
}
}
repo->start();
{
REQUIRE(1 == metrics.serialize().size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "true");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
if (expected_metric_count > 5) {
checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
}
}
repo->stop();
{
REQUIRE(1 == metrics.serialize().size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
if (expected_metric_count > 5) {
checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
}
}
}
TEST_CASE("VolatileRepositorymetricsCanBeFull", "[c2m4]") {
minifi::state::response::RepositoryMetrics metrics;
REQUIRE("RepositoryMetrics" == metrics.getName());
auto repo = std::make_shared<TestVolatileRepository>();
metrics.addRepository(repo);
{
REQUIRE(1 == metrics.serialize().size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
REQUIRE(5 == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
checkSerializedValue(resp.children, "entryCount", "0");
}
repo->setFull();
{
REQUIRE(1 == metrics.serialize().size());
minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
REQUIRE(5 == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "true");
checkSerializedValue(resp.children, "size", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
checkSerializedValue(resp.children, "maxSize", std::to_string(static_cast<int64_t>(TEST_MAX_REPOSITORY_STORAGE_SIZE * 0.75)));
checkSerializedValue(resp.children, "entryCount", "10000");
}
}
TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") {
DummyProcessor dummy_processor("dummy");
minifi::core::ProcessorMetricsImpl metrics(dummy_processor);
REQUIRE("DummyProcessorMetrics" == metrics.getName());
REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
metrics.addLastOnTriggerRuntime(10ms);
metrics.addLastOnTriggerRuntime(20ms);
metrics.addLastOnTriggerRuntime(30ms);
REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
for (auto i = 0; i < 7; ++i) {
metrics.addLastOnTriggerRuntime(50ms);
}
REQUIRE(metrics.getAverageOnTriggerRuntime() == 41ms);
REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
for (auto i = 0; i < 3; ++i) {
metrics.addLastOnTriggerRuntime(50ms);
}
REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
for (auto i = 0; i < 10; ++i) {
metrics.addLastOnTriggerRuntime(40ms);
}
REQUIRE(metrics.getAverageOnTriggerRuntime() == 40ms);
REQUIRE(metrics.getLastOnTriggerRuntime() == 40ms);
metrics.addLastOnTriggerRuntime(10ms);
REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
}
TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
DummyProcessor dummy_processor("dummy");
minifi::core::ProcessorMetricsImpl metrics(dummy_processor);
REQUIRE("DummyProcessorMetrics" == metrics.getName());
REQUIRE(metrics.getLastSessionCommitRuntime() == 0ms);
REQUIRE(metrics.getAverageSessionCommitRuntime() == 0ms);
metrics.addLastSessionCommitRuntime(10ms);
metrics.addLastSessionCommitRuntime(20ms);
metrics.addLastSessionCommitRuntime(30ms);
REQUIRE(metrics.getLastSessionCommitRuntime() == 30ms);
REQUIRE(metrics.getAverageSessionCommitRuntime() == 20ms);
for (auto i = 0; i < 7; ++i) {
metrics.addLastSessionCommitRuntime(50ms);
}
REQUIRE(metrics.getAverageSessionCommitRuntime() == 41ms);
REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
for (auto i = 0; i < 3; ++i) {
metrics.addLastSessionCommitRuntime(50ms);
}
REQUIRE(metrics.getAverageSessionCommitRuntime() == 50ms);
REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
for (auto i = 0; i < 10; ++i) {
metrics.addLastSessionCommitRuntime(40ms);
}
REQUIRE(metrics.getAverageSessionCommitRuntime() == 40ms);
REQUIRE(metrics.getLastSessionCommitRuntime() == 40ms);
metrics.addLastSessionCommitRuntime(10ms);
REQUIRE(metrics.getLastSessionCommitRuntime() == 10ms);
REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
}
class DuplicateContentProcessor : public minifi::core::ProcessorImpl {
public:
using ProcessorImpl::ProcessorImpl;
static constexpr const char* Description = "A processor that creates two more of the same flow file.";
static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"};
static constexpr auto Original = core::RelationshipDefinition{"original", "Original FlowFile"};
static constexpr auto Relationships = std::array{Success, Original};
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
static constexpr bool IsSingleThreaded = false;
void initialize() override {
setSupportedRelationships(Relationships);
}
void onTrigger(core::ProcessContext& /*context*/, core::ProcessSession& session) override {
auto flow_file = session.get();
if (!flow_file) {
return;
}
auto flow_file_copy = session.create();
std::vector<std::byte> buffer;
session.read(flow_file, [&](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
buffer.resize(stream->size());
return gsl::narrow<int64_t>(stream->read(buffer));
});
session.write(flow_file_copy, [&](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
return gsl::narrow<int64_t>(stream->write(buffer));
});
session.append(flow_file_copy, [&](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
return gsl::narrow<int64_t>(stream->write(buffer));
});
session.transfer(flow_file_copy, Success);
session.transfer(flow_file, Original);
}
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
};
TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") {
minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<DuplicateContentProcessor>("DuplicateContentProcessor"));
test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
auto metrics = test_controller.getProcessor()->getMetrics();
CHECK(metrics->invocations() == 1);
CHECK(metrics->incomingFlowFiles() == 1);
CHECK(metrics->transferredFlowFiles() == 2);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
CHECK(metrics->incomingBytes() == 10);
CHECK(metrics->transferredBytes() == 30);
CHECK(metrics->bytesRead() == 10);
CHECK(metrics->bytesWritten() == 20);
auto old_nanos = metrics->processingNanos().load();
CHECK(metrics->processingNanos() > 0);
test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}});
CHECK(metrics->invocations() == 2);
CHECK(metrics->incomingFlowFiles() == 2);
CHECK(metrics->transferredFlowFiles() == 4);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
CHECK(metrics->incomingBytes() == 24);
CHECK(metrics->transferredBytes() == 72);
CHECK(metrics->bytesRead() == 24);
CHECK(metrics->bytesWritten() == 48);
CHECK(metrics->processingNanos() > old_nanos);
}
} // namespace org::apache::nifi::minifi::test