| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "../PushGrafanaLokiREST.h" |
| #include "MockGrafanaLokiREST.h" |
| #include "unit/SingleProcessorTestController.h" |
| #include "unit/Catch.h" |
| #include "utils/StringUtils.h" |
| #include "unit/TestUtils.h" |
| |
| #ifdef WIN32 |
| #pragma push_macro("GetObject") |
| #undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson |
| #endif |
| |
| namespace org::apache::nifi::minifi::extensions::grafana::loki::test { |
| |
| TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiREST>("PushGrafanaLokiREST")); |
| auto push_grafana_loki_rest = test_controller.getProcessor(); |
| |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, ""); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); |
| } |
| |
| TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiREST>("PushGrafanaLokiREST")); |
| auto push_grafana_loki_rest = test_controller.getProcessor(); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| SECTION("Stream labels cannot be empty") { |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); |
| } |
| SECTION("Stream labels need to be valid") { |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); |
| } |
| REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); |
| } |
| |
| TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiREST>("PushGrafanaLokiREST")); |
| auto push_grafana_loki_rest = test_controller.getProcessor(); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "0"); |
| REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); |
| } |
| |
| class PushGrafanaLokiRESTTestFixture { |
| public: |
| PushGrafanaLokiRESTTestFixture() |
| : mock_loki_("10990"), |
| test_controller_(minifi::test::utils::make_processor<PushGrafanaLokiREST>("PushGrafanaLokiREST")), |
| push_grafana_loki_rest_(test_controller_.getProcessor()) { |
| LogTestController::getInstance().setDebug<TestPlan>(); |
| LogTestController::getInstance().setDebug<minifi::core::Processor>(); |
| LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); |
| LogTestController::getInstance().setTrace<PushGrafanaLokiREST>(); |
| test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::Url, "localhost:10990"); |
| test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"); |
| } |
| |
| void setProperty(const auto& property, const std::string& property_value) { |
| test_controller_.plan->setProperty(push_grafana_loki_rest_, property, property_value); |
| } |
| |
| void verifyLastRequestIsEmpty() { |
| const auto& request = mock_loki_.getLastRequest(); |
| REQUIRE(request.IsNull()); |
| } |
| |
| void verifyTenantId(const std::string& tenant_id) { |
| REQUIRE(mock_loki_.getLastTenantId() == tenant_id); |
| } |
| |
| void verifyBasicAuthorization(const std::string& expected_username_and_password) { |
| auto last_authorization = mock_loki_.getLastAuthorization(); |
| std::string expected_authorization = "Basic "; |
| REQUIRE(minifi::utils::string::startsWith(last_authorization, expected_authorization)); |
| std::string username_and_password_decoded = minifi::utils::string::from_base64(last_authorization.substr(expected_authorization.size()), minifi::utils::as_string_tag_t{}); |
| REQUIRE(username_and_password_decoded == expected_username_and_password); |
| } |
| |
| void verifyBearerTokenAuthorization(const std::string& expected_bearer_token) { |
| auto last_authorization = mock_loki_.getLastAuthorization(); |
| std::string expected_authorization = "Bearer "; |
| REQUIRE(minifi::utils::string::startsWith(last_authorization, expected_authorization)); |
| auto bearer_token = last_authorization.substr(expected_authorization.size()); |
| REQUIRE(bearer_token == expected_bearer_token); |
| } |
| |
| void verifyStreamLabels() { |
| const auto& request = mock_loki_.getLastRequest(); |
| REQUIRE(request.HasMember("streams")); |
| const auto& stream_array = request["streams"].GetArray(); |
| REQUIRE(stream_array.Size() == 1); |
| REQUIRE(stream_array[0].HasMember("stream")); |
| const auto& stream = stream_array[0]["stream"].GetObject(); |
| REQUIRE(stream.HasMember("job")); |
| std::string job_string = stream["job"].GetString(); |
| REQUIRE(job_string == "minifi"); |
| REQUIRE(stream.HasMember("directory")); |
| std::string directory_string = stream["directory"].GetString(); |
| REQUIRE(directory_string == "/opt/minifi/logs/"); |
| } |
| |
| void verifySentRequestToLoki(uint64_t start_timestamp, const std::vector<std::string>& expected_log_values, |
| const std::vector<std::map<std::string, std::string>>& expected_log_line_attribute_values = {}) { |
| const auto& request = mock_loki_.getLastRequest(); |
| REQUIRE(request.HasMember("streams")); |
| const auto& stream_array = request["streams"].GetArray(); |
| REQUIRE(stream_array.Size() > 0); |
| REQUIRE(stream_array[0].HasMember("values")); |
| const auto& value_array = stream_array[0]["values"].GetArray(); |
| REQUIRE(value_array.Size() == expected_log_values.size()); |
| for (size_t i = 0; i < expected_log_values.size(); ++i) { |
| const auto& log_line_array = value_array[gsl::narrow<rapidjson::SizeType>(i)].GetArray(); |
| if (!expected_log_line_attribute_values.empty()) { |
| REQUIRE(log_line_array.Size() == 3); |
| } else { |
| REQUIRE(log_line_array.Size() == 2); |
| } |
| std::string timestamp_str = log_line_array[0].GetString(); |
| REQUIRE(start_timestamp <= std::stoull(timestamp_str)); |
| std::string log_value = log_line_array[1].GetString(); |
| REQUIRE(log_value == expected_log_values[i]); |
| if (!expected_log_line_attribute_values.empty()) { |
| REQUIRE(log_line_array[2].IsObject()); |
| const auto& log_line_attribute_object = log_line_array[2].GetObject(); |
| for (const auto& [key, value] : expected_log_line_attribute_values[i]) { |
| REQUIRE(log_line_attribute_object.HasMember(key.c_str())); |
| REQUIRE(log_line_attribute_object[key.c_str()].GetString() == value); |
| } |
| } |
| } |
| } |
| |
| void verifyTransferredFlowContent(const std::vector<std::shared_ptr<core::FlowFile>>& flow_files, const std::vector<std::string>& expected_log_values) const { |
| CHECK(flow_files.size() == expected_log_values.size()); |
| for (const auto& flow_file : flow_files) { |
| CHECK(std::find(expected_log_values.begin(), expected_log_values.end(), test_controller_.plan->getContent(flow_file)) != expected_log_values.end()); |
| } |
| } |
| |
| protected: |
| MockGrafanaLokiREST mock_loki_; |
| minifi::test::SingleProcessorTestController test_controller_; |
| TypedProcessorWrapper<PushGrafanaLokiREST> push_grafana_loki_rest_; |
| }; |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "PushGrafanaLokiREST should send 1 log line to Grafana Loki in a trigger", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "1"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"孫子兵法", {}}, minifi::test::InputFlowFileData{"Война и мир", {}}}); |
| verifyStreamLabels(); |
| std::vector<std::string> expected_log_values = {"孫子兵法"}; |
| verifySentRequestToLoki(start_timestamp, expected_log_values); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "PushGrafanaLokiREST should wait for Log Line Batch Size limit to be reached", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "4"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "2"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}}); |
| CHECK(results.at(PushGrafanaLokiREST::Success).empty()); |
| verifyLastRequestIsEmpty(); |
| results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}}); |
| verifyStreamLabels(); |
| std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"}; |
| verifySentRequestToLoki(start_timestamp, expected_log_values); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "Multiple batches are sent in a single trigger", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "2"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "4"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}, |
| minifi::test::InputFlowFileData{"log line 4", {}}, minifi::test::InputFlowFileData{"log line 5", {}}}); |
| verifyStreamLabels(); |
| std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"}; |
| verifySentRequestToLoki(start_timestamp, {"log line 3", "log line 4"}); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "If submitting to Grafana Loki fails then the flow files should be transferred to failure", "[PushGrafanaLokiREST]") { |
| setProperty(PushGrafanaLokiREST::Url, "http://invalid-url"); |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "4"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "2"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}}); |
| CHECK(results.at(PushGrafanaLokiREST::Success).empty()); |
| CHECK(results.at(PushGrafanaLokiREST::Failure).empty()); |
| results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}}); |
| std::vector<std::string> expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"}; |
| CHECK(results.at(PushGrafanaLokiREST::Success).empty()); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Failure), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "If no log line batch limit is set, all log files in a single trigger should be processed", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| std::vector<std::string> expected_log_values; |
| SECTION("Max Batch Size is set") { |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "2"); |
| expected_log_values = {"log line 1", "log line 2"}; |
| } |
| SECTION("No Max Batch Size is set") { |
| expected_log_values = {"log line 1", "log line 2", "log line 3"}; |
| } |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}}); |
| verifyStreamLabels(); |
| verifySentRequestToLoki(start_timestamp, expected_log_values); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "Log line metadata can be added with flow file attributes", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "2"); |
| setProperty(PushGrafanaLokiREST::LogLineMetadataAttributes, "label1, label2, label3"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {{"label1", "value1"}, {"label4", "value4"}}}, |
| minifi::test::InputFlowFileData{"log line 2", {{"label1", "value1"}, {"label2", "value2"}}}, minifi::test::InputFlowFileData{"log line 3", {}}}); |
| verifyStreamLabels(); |
| std::vector<std::string> expected_log_values = {"log line 1", "log line 2"}; |
| std::vector<std::map<std::string, std::string>> expected_log_line_attribute_values = {{{"label1", "value1"}}, {{"label1", "value1"}, {"label2", "value2"}}}; |
| verifySentRequestToLoki(start_timestamp, expected_log_values, expected_log_line_attribute_values); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "Tenant ID can be set in properties", "[PushGrafanaLokiREST]") { |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "1"); |
| setProperty(PushGrafanaLokiREST::TenantID, "mytenant"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); |
| verifyTenantId("mytenant"); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "PushGrafanaLokiREST should wait for Log Line Batch Wait time to be reached", "[PushGrafanaLokiREST]") { |
| uint64_t start_timestamp = std::chrono::system_clock::now().time_since_epoch() / std::chrono::nanoseconds(1); |
| setProperty(PushGrafanaLokiREST::LogLineBatchWait, "200 ms"); |
| setProperty(PushGrafanaLokiREST::MaxBatchSize, "3"); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}, minifi::test::InputFlowFileData{"log line 2", {}}, minifi::test::InputFlowFileData{"log line 3", {}}}); |
| verifyLastRequestIsEmpty(); |
| std::this_thread::sleep_for(300ms); |
| std::vector<std::string> expected_log_values; |
| SECTION("Trigger with new flow file") { |
| results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 4", {}}}); |
| expected_log_values = {"log line 1", "log line 2", "log line 3", "log line 4"}; |
| } |
| SECTION("Trigger without new flow file should also send the batch") { |
| results = test_controller_.trigger(std::vector<minifi::test::InputFlowFileData>{}); |
| expected_log_values = {"log line 1", "log line 2", "log line 3"}; |
| } |
| verifyStreamLabels(); |
| verifySentRequestToLoki(start_timestamp, expected_log_values); |
| verifyTransferredFlowContent(results.at(PushGrafanaLokiREST::Success), expected_log_values); |
| } |
| |
| TEST_CASE("If username is set, password is also required to be set", "[PushGrafanaLokiREST]") { |
| minifi::test::SingleProcessorTestController test_controller(minifi::test::utils::make_processor<PushGrafanaLokiREST>("PushGrafanaLokiREST")); |
| auto push_grafana_loki_rest = test_controller.getProcessor(); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Username, "admin"); |
| REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "Basic authentication is set in HTTP header", "[PushGrafanaLokiREST]") { |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| setProperty(PushGrafanaLokiREST::Username, "admin"); |
| setProperty(PushGrafanaLokiREST::Password, "admin"); |
| setProperty(PushGrafanaLokiREST::BearerTokenFile, "mytoken"); // Basic authentication should take precedence |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); |
| verifyBasicAuthorization("admin:admin"); |
| } |
| |
| TEST_CASE_METHOD(PushGrafanaLokiRESTTestFixture, "Bearer token is set for authentication", "[PushGrafanaLokiREST]") { |
| auto temp_dir = test_controller_.createTempDirectory(); |
| auto test_file_path = minifi::test::utils::putFileToDir(temp_dir, "test1.txt", "mytoken\n"); |
| setProperty(PushGrafanaLokiREST::LogLineBatchSize, "1"); |
| setProperty(PushGrafanaLokiREST::BearerTokenFile, test_file_path.string()); |
| auto results = test_controller_.trigger({minifi::test::InputFlowFileData{"log line 1", {}}}); |
| verifyBearerTokenAuthorization("mytoken"); |
| } |
| |
| } // namespace org::apache::nifi::minifi::extensions::grafana::loki::test |
| |
| #ifdef WIN32 |
| #pragma pop_macro("GetObject") |
| #endif |