blob: d583cf60c21192d73b87a28509feb28a0425669b [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "../PushGrafanaLokiGrpc.h"
#include "MockGrafanaLokiGrpc.h"
#include "unit/SingleProcessorTestController.h"
#include "unit/Catch.h"
#include "utils/StringUtils.h"
#include "unit/TestUtils.h"
namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
TEST_CASE("Url property is required", "[PushGrafanaLokiGrpc]") {
minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiGrpc>("PushGrafanaLokiGrpc"));
auto push_grafana_loki_grpc = test_controller.getProcessor();
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::Url, "");
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::StreamLabels, "job=minifi,directory=/opt/minifi/logs/");
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::LogLineBatchSize, "1");
REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
}
TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiGrpc]") {
minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiGrpc>("PushGrafanaLokiGrpc"));
auto push_grafana_loki_grpc = test_controller.getProcessor();
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::Url, "localhost:10991");
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::LogLineBatchSize, "1");
SECTION("Stream labels cannot be empty") {
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::StreamLabels, "");
}
SECTION("Stream labels need to be valid") {
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::StreamLabels, "invalidlabels,invalidlabels2");
}
REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
}
TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiGrpc]") {
minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiGrpc>("PushGrafanaLokiGrpc"));
auto push_grafana_loki_grpc = test_controller.getProcessor();
CHECK(test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::Url, "localhost:10991"));
CHECK(test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
test_controller.plan->setProperty(push_grafana_loki_grpc, PushGrafanaLokiGrpc::LogLineBatchSize, "0");
REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
}
class PushGrafanaLokiGrpcTestFixture {
public:
PushGrafanaLokiGrpcTestFixture()
: mock_loki_("10991"),
test_controller_(minifi::test::utils::make_processor<PushGrafanaLokiGrpc>("PushGrafanaLokiGrpc")),
push_grafana_loki_grpc_(test_controller_.getProcessor()) {
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setTrace<PushGrafanaLokiGrpc>();
CHECK(test_controller_.plan->setProperty(push_grafana_loki_grpc_, PushGrafanaLokiGrpc::Url, "localhost:10991"));
CHECK(test_controller_.plan->setProperty(push_grafana_loki_grpc_, PushGrafanaLokiGrpc::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
}
void setProperty(const auto& property, const std::string& property_value) {
CHECK(test_controller_.plan->setProperty(push_grafana_loki_grpc_, property, property_value));
}
void verifyLastRequestIsEmpty() {
REQUIRE(mock_loki_.getLastRequest().entries.empty());
}
void verifyTenantId(const std::string& tenant_id) {
REQUIRE(mock_loki_.getLastTenantId() == tenant_id);
}
void verifyStreamLabels() {
const auto request = mock_loki_.getLastRequest();
CHECK(request.stream_labels == "{directory=\"/opt/minifi/logs/\", job=\"minifi\"}");
}
void verifySentRequestToLoki(uint64_t start_timestamp, const std::vector<std::string>& expected_log_values,
const std::vector<std::map<std::string, std::string>>& expected_log_line_attribute_values = {}) {
const auto request = mock_loki_.getLastRequest();
CHECK(request.entries.size() == expected_log_values.size());
for (size_t i = 0; i < request.entries.size(); ++i) {
CHECK(start_timestamp <= request.entries[i].timestamp);
CHECK(request.entries[i].line == expected_log_values[i]);
if (!expected_log_line_attribute_values.empty()) {
REQUIRE(request.entries[i].labels.size() == expected_log_line_attribute_values[i].size());
CHECK(request.entries[i].labels == expected_log_line_attribute_values[i]);
}
}
}
void verifyTransferredFlowContent(const std::vector<std::shared_ptr<core::FlowFile>>& flow_files, const std::vector<std::string>& expected_log_values) const {
CHECK(flow_files.size() == expected_log_values.size());
for (const auto& flow_file : flow_files) {
CHECK(std::find(expected_log_values.begin(), expected_log_values.end(), test_controller_.plan->getContent(flow_file)) != expected_log_values.end());
}
}
protected:
MockGrafanaLokiGrpc mock_loki_;
minifi::test::SingleProcessorTestController test_controller_;
TypedProcessorWrapper<PushGrafanaLokiGrpc> push_grafana_loki_grpc_;
};
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "PushGrafanaLokiGrpc should send 1 log line to Grafana Loki in a trigger", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
setProperty(PushGrafanaLokiGrpc::LogLineBatchSize, "1");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "1");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"孫子兵法", {}}, minifi::test::InputFlowFileData{"Война и мир", {}}});
verifyStreamLabels();
std::vector<std::string> expected_log_values = {"孫子兵法"};
verifySentRequestToLoki(start_timestamp, expected_log_values);
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "PushGrafanaLokiGrpc should wait for Log Line Batch Size limit to be reached", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
setProperty(PushGrafanaLokiGrpc::LogLineBatchSize, "4");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "2");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}});
CHECK(results.at(PushGrafanaLokiGrpc::Success).empty());
verifyLastRequestIsEmpty();
results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}});
verifyStreamLabels();
std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"};
verifySentRequestToLoki(start_timestamp, expected_log_values);
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "Multiple batches are sent in a single trigger", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
setProperty(PushGrafanaLokiGrpc::LogLineBatchSize, "2");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "4");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}},
minifi::test::InputFlowFileData{"log line 4", {}}, minifi::test::InputFlowFileData{"log line 5", {}}});
verifyStreamLabels();
std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"};
verifySentRequestToLoki(start_timestamp, {"log line 3", "log line 4"});
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "If submitting to Grafana Loki fails then the flow files should be transferred to failure", "[PushGrafanaLokiGrpc]") {
setProperty(PushGrafanaLokiGrpc::Url, "http://invalid-url");
setProperty(PushGrafanaLokiGrpc::LogLineBatchSize, "4");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "2");
setProperty(PushGrafanaLokiGrpc::ConnectTimeout, "100 ms");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}});
CHECK(results.at(PushGrafanaLokiGrpc::Success).empty());
CHECK(results.at(PushGrafanaLokiGrpc::Failure).empty());
results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}});
std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"};
CHECK(results.at(PushGrafanaLokiGrpc::Success).empty());
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Failure), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "If no log line batch limit is set, all log files in a single trigger should be processed", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
std::vector<std::string> expected_log_values;
SECTION("Max Batch Size is set") {
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "2");
expected_log_values = {"log line 1", "log line 2"};
}
SECTION("No Max Batch Size is set") {
expected_log_values = {"log line 1", "log line 2", "log line 3"};
}
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}});
verifyStreamLabels();
verifySentRequestToLoki(start_timestamp, expected_log_values);
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "Log line metadata can be added with flow file attributes", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "2");
setProperty(PushGrafanaLokiGrpc::LogLineMetadataAttributes, "label1, label2, label3");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {{"label1", "value1"}, {"label4", "value4"}}},
minifi::test::InputFlowFileData{"log line 2", {{"label1", "value1"}, {"label2", "value2"}}}, minifi::test::InputFlowFileData{"log line 3", {}}});
verifyStreamLabels();
std::vector<std::string> expected_log_values = {"log line 1", "log line 2"};
std::vector<std::map<std::string, std::string>> expected_log_line_attribute_values = {{{"label1", "value1"}}, {{"label1", "value1"}, {"label2", "value2"}}};
verifySentRequestToLoki(start_timestamp, expected_log_values, expected_log_line_attribute_values);
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "Tenant ID can be set in properties", "[PushGrafanaLokiGrpc]") {
setProperty(PushGrafanaLokiGrpc::LogLineBatchSize, "1");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "1");
setProperty(PushGrafanaLokiGrpc::TenantID, "mytenant");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
verifyTenantId("mytenant");
}
TEST_CASE_METHOD(PushGrafanaLokiGrpcTestFixture, "PushGrafanaLokiGrpc should wait for Log Line Batch Wait time to be reached", "[PushGrafanaLokiGrpc]") {
uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1);
setProperty(PushGrafanaLokiGrpc::LogLineBatchWait, "200 ms");
setProperty(PushGrafanaLokiGrpc::MaxBatchSize, "3");
auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}});
verifyLastRequestIsEmpty();
std::this_thread::sleep_for(300ms);
std::vector<std::string> expected_log_values;
SECTION("Trigger with new flow file") {
results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}});
expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"};
}
SECTION("Trigger without new flow file should also send the batch") {
results = test_controller_.trigger(std::vector<minifi::test::InputFlowFileData>{});
expected_log_values = {"log line 1", "log line 2", "log line 3"};
}
verifyStreamLabels();
verifySentRequestToLoki(start_timestamp, expected_log_values);
verifyTransferredFlowContent(results.at(PushGrafanaLokiGrpc::Success), expected_log_values);
}
} // namespace org::apache::nifi::minifi::extensions::grafana::loki::test