blob: 8cc4a6fb3fea3d1a876aea0923a9e35ab2fee368 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <array>
#include <chrono>
#include "QuerySplunkIndexingStatus.h"
#include "MockSplunkHEC.h"
#include "SplunkAttributes.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "processors/UpdateAttribute.h"
#include "unit/ReadFromFlowFileTestProcessor.h"
#include "unit/WriteToFlowFileTestProcessor.h"
using QuerySplunkIndexingStatus = org::apache::nifi::minifi::extensions::splunk::QuerySplunkIndexingStatus;
using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
using namespace std::literals::chrono_literals;
TEST_CASE("QuerySplunkIndexingStatus tests", "[querysplunkindexingstatus]") {
MockSplunkHEC mock_splunk_hec("10132");
TestController test_controller;
auto plan = test_controller.createPlan();
auto write_to_flow_file = plan->addProcessor<WriteToFlowFileTestProcessor>("write_to_flow_file");
auto update_attribute = plan->addProcessor("UpdateAttribute", "update_attribute");
auto query_splunk_indexing_status = plan->addProcessor("QuerySplunkIndexingStatus", "query_splunk_indexing_status");
auto read_from_acknowledged = plan->addProcessor<ReadFromFlowFileTestProcessor>("read_from_acknowledged");
auto read_from_undetermined = plan->addProcessor<ReadFromFlowFileTestProcessor>("read_from_undetermined");
auto read_from_unacknowledged = plan->addProcessor<ReadFromFlowFileTestProcessor>("read_from_unacknowledged");
auto read_from_failure = plan->addProcessor<ReadFromFlowFileTestProcessor>("read_from_failure");
plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, update_attribute);
plan->addConnection(update_attribute, UpdateAttribute ::Success, query_splunk_indexing_status);
plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Acknowledged, read_from_acknowledged);
plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Undetermined, read_from_undetermined);
plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Unacknowledged, read_from_unacknowledged);
plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Failure, read_from_failure);
read_from_acknowledged->setAutoTerminatedRelationships(std::array{core::Relationship{ReadFromFlowFileTestProcessor::Success}});
read_from_undetermined->setAutoTerminatedRelationships(std::array{core::Relationship{ReadFromFlowFileTestProcessor::Success}});
read_from_unacknowledged->setAutoTerminatedRelationships(std::array{core::Relationship{ReadFromFlowFileTestProcessor::Success}});
read_from_failure->setAutoTerminatedRelationships(std::array{core::Relationship{ReadFromFlowFileTestProcessor::Success}});
plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Hostname, "localhost");
plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Port, mock_splunk_hec.getPort());
plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Token, MockSplunkHEC::TOKEN);
plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::SplunkRequestChannel, "a12254b4-f481-435d-896d-3b6033eabe58");
auto response_timestamp = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp);
write_to_flow_file.get().setContent("foobar");
SECTION("Querying indexed id") {
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, std::to_string(MockSplunkHEC::indexed_events[0]));
test_controller.runSession(plan);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 1);
}
SECTION("Querying not indexed id") {
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "100");
query_splunk_indexing_status->setPenalizationPeriod(50ms);
test_controller.runSession(plan);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 0); // result penalized
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 0);
write_to_flow_file.get().setContent("");
plan->reset();
std::this_thread::sleep_for(100ms);
test_controller.runSession(plan);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 1);
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 0);
}
SECTION("Querying not indexed old id") {
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "100");
response_timestamp = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() - 2h).time_since_epoch()).count());
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp);
test_controller.runSession(plan);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 1);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 0);
}
SECTION("Multiple inputs with same id") {
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, std::to_string(MockSplunkHEC::indexed_events[0]));
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp);
for (size_t i = 0; i < 4; ++i) {
plan->runProcessor(write_to_flow_file);
plan->runProcessor(update_attribute);
}
plan->runProcessor(query_splunk_indexing_status);
plan->runProcessor(read_from_failure);
plan->runProcessor(read_from_undetermined);
plan->runProcessor(read_from_unacknowledged);
plan->runProcessor(read_from_acknowledged);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 4);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 0);
}
SECTION("MaxQuerySize can limit the number of queries") {
plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::MaxQuerySize, "5");
for (size_t i = 0; i < 10; ++i) {
plan->runProcessor(write_to_flow_file);
plan->runProcessor(update_attribute);
}
plan->runProcessor(query_splunk_indexing_status);
CHECK(plan->getNumFlowFileProducedByProcessor(query_splunk_indexing_status) == 5);
}
SECTION("Input flow file has no attributes") {
test_controller.runSession(plan);
CHECK(read_from_failure.get().numberOfFlowFilesRead() == 1);
CHECK(read_from_undetermined.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_unacknowledged.get().numberOfFlowFilesRead() == 0);
CHECK(read_from_acknowledged.get().numberOfFlowFilesRead() == 0);
}
SECTION("Invalid index") {
plan->setDynamicProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "foo");
REQUIRE_THROWS(test_controller.runSession(plan));
}
}