blob: 02aae265074d8d0133ba0abdc7cdc2cde9f55718 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeWindowsEventLog.h"
#include "core/ConfigurableComponent.h"
#include "processors/LogAttribute.h"
#include "TestBase.h"
using ConsumeWindowsEventLog = org::apache::nifi::minifi::processors::ConsumeWindowsEventLog;
using LogAttribute = org::apache::nifi::minifi::processors::LogAttribute;
using ConfigurableComponent = org::apache::nifi::minifi::core::ConfigurableComponent;
using IdGenerator = org::apache::nifi::minifi::utils::IdGenerator;
namespace {
core::Relationship Success{"success", "Everything is fine"};
const std::string APPLICATION_CHANNEL = "Application";
constexpr DWORD CWEL_TESTS_OPCODE = 14985; // random opcode hopefully won't clash with something important
const std::string QUERY = "Event/System/EventID=" + std::to_string(CWEL_TESTS_OPCODE);
void reportEvent(const std::string& channel, const char* message, WORD log_level = EVENTLOG_INFORMATION_TYPE) {
auto event_source = RegisterEventSourceA(nullptr, channel.c_str());
auto deleter = gsl::finally([&event_source](){ DeregisterEventSource(event_source); });
ReportEventA(event_source, log_level, 0, CWEL_TESTS_OPCODE, nullptr, 1, 0, &message, nullptr);
}
} // namespace
TEST_CASE("ConsumeWindowsEventLog constructor works", "[create]") {
TestController test_controller;
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
REQUIRE_NOTHROW(ConsumeWindowsEventLog processor_one("one"));
REQUIRE_NOTHROW(
utils::Identifier uuid = utils::IdGenerator::getIdGenerator()->generate();
ConsumeWindowsEventLog processor_two("two", uuid);
); // NOLINT
REQUIRE_NOTHROW(
auto processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
); // NOLINT
}
TEST_CASE("ConsumeWindowsEventLog properties work with default values", "[create][properties]") {
TestController test_controller;
LogTestController::getInstance().setDebug<ConfigurableComponent>();
LogTestController::getInstance().setTrace<ConsumeWindowsEventLog>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_controller.runSession(test_plan);
auto properties_required_or_with_default_value = {
ConsumeWindowsEventLog::Channel,
ConsumeWindowsEventLog::Query,
// ConsumeWindowsEventLog::RenderFormatXML, // FIXME(fgerlits): not defined, does not exist in NiFi either; should be removed
ConsumeWindowsEventLog::MaxBufferSize,
// ConsumeWindowsEventLog::InactiveDurationToReconnect, // FIXME(fgerlits): obsolete, see definition; should be removed
ConsumeWindowsEventLog::IdentifierMatcher,
ConsumeWindowsEventLog::IdentifierFunction,
ConsumeWindowsEventLog::ResolveAsAttributes,
ConsumeWindowsEventLog::EventHeader,
ConsumeWindowsEventLog::OutputFormat,
ConsumeWindowsEventLog::BatchCommitSize,
ConsumeWindowsEventLog::BookmarkRootDirectory, // TODO(fgerlits): obsolete, see definition; remove in a later release
ConsumeWindowsEventLog::ProcessOldEvents
};
for (const core::Property& property : properties_required_or_with_default_value) {
if (!LogTestController::getInstance().contains("property name " + property.getName() + " value ")) {
FAIL("Property did not get queried: " << property.getName());
}
}
auto properties_optional_without_default_value = {
ConsumeWindowsEventLog::EventHeaderDelimiter
};
for (const core::Property& property : properties_optional_without_default_value) {
if (!LogTestController::getInstance().contains("property name " + property.getName() + ", empty value")) {
FAIL("Optional property did not get queried: " << property.getName());
}
}
REQUIRE(LogTestController::getInstance().contains("Successfully configured CWEL"));
}
TEST_CASE("ConsumeWindowsEventLog onSchedule throws if it cannot create the bookmark", "[create][bookmark]") {
TestController test_controller;
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(processor, ConsumeWindowsEventLog::Channel.getName(), "NonexistentChannel1234981");
REQUIRE_THROWS_AS(test_controller.runSession(test_plan), minifi::Exception);
}
TEST_CASE("ConsumeWindowsEventLog can consume new events", "[onTrigger]") {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
test_plan->setProperty(logger_processor, LogAttribute::LogPayload.getName(), "true");
test_plan->setProperty(logger_processor, LogAttribute::MaxPayloadLineLength.getName(), "1024");
reportEvent(APPLICATION_CHANNEL, "Event zero");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 0 Events"));
// event zero is not reported as the bookmark is created on the first run
// and we use the default config setting ProcessOldEvents = false
// later runs will start with a bookmark saved in the state manager
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
SECTION("Read one event") {
reportEvent(APPLICATION_CHANNEL, "Event one");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 1 Events"));
REQUIRE(LogTestController::getInstance().contains("<EventData><Data>Event one</Data></EventData>"));
// make sure timezone attributes are present
REQUIRE(LogTestController::getInstance().contains("key:Timezone offset value:"));
REQUIRE(LogTestController::getInstance().contains("key:Timezone name value:"));
}
SECTION("Read two events") {
reportEvent(APPLICATION_CHANNEL, "Event two");
reportEvent(APPLICATION_CHANNEL, "Event three");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 2 Events"));
REQUIRE(LogTestController::getInstance().contains("<EventData><Data>Event two</Data></EventData>"));
REQUIRE(LogTestController::getInstance().contains("<EventData><Data>Event three</Data></EventData>"));
}
}
TEST_CASE("ConsumeWindowsEventLog bookmarking works", "[onTrigger]") {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
reportEvent(APPLICATION_CHANNEL, "Event zero");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 0 Events"));
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
SECTION("Read in one go") {
reportEvent(APPLICATION_CHANNEL, "Event one");
reportEvent(APPLICATION_CHANNEL, "Event two");
reportEvent(APPLICATION_CHANNEL, "Event three");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 3 Events"));
}
SECTION("Read in two batches") {
reportEvent(APPLICATION_CHANNEL, "Event one");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 1 Events"));
reportEvent(APPLICATION_CHANNEL, "Event two");
reportEvent(APPLICATION_CHANNEL, "Event three");
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed 2 Events"));
}
}
TEST_CASE("ConsumeWindowsEventLog extracts some attributes by default", "[onTrigger]") {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
// 0th event, only to create a bookmark
{
reportEvent(APPLICATION_CHANNEL, "Event zero: this is in the past");
test_controller.runSession(test_plan);
}
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
// 1st event, on Info level
{
reportEvent(APPLICATION_CHANNEL, "Event one: something interesting happened", EVENTLOG_INFORMATION_TYPE);
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("key:Keywords value:Classic"));
REQUIRE(LogTestController::getInstance().contains("key:Level value:Information"));
}
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
// 2st event, on Warning level
{
reportEvent(APPLICATION_CHANNEL, "Event two: something fishy happened!", EVENTLOG_WARNING_TYPE);
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("key:Keywords value:Classic"));
REQUIRE(LogTestController::getInstance().contains("key:Level value:Warning"));
}
}
namespace {
void outputFormatSetterTestHelper(const std::string &output_format, int expected_num_flow_files) {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), output_format);
auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
{
reportEvent(APPLICATION_CHANNEL, "Event zero: this is in the past");
test_controller.runSession(test_plan);
}
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
{
reportEvent(APPLICATION_CHANNEL, "Event one");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("Logged " + std::to_string(expected_num_flow_files) + " flow files"));
}
}
} // namespace
TEST_CASE("ConsumeWindowsEventLog output format can be set", "[create][output_format]") {
outputFormatSetterTestHelper("XML", 1);
outputFormatSetterTestHelper("Plaintext", 1);
outputFormatSetterTestHelper("Both", 2);
// NOTE(fgerlits): this may be a bug, as I would expect this to throw in onSchedule(),
// but it starts merrily, just does not write flow files in either format
outputFormatSetterTestHelper("InvalidValue", 0);
}
// NOTE(fgerlits): I don't know how to unit test this, as my manually published events all result in an empty string if OutputFormat is Plaintext
// but it does seem to work, based on manual tests reading system logs
// TEST_CASE("ConsumeWindowsEventLog prints events in plain text correctly", "[onTrigger]")
TEST_CASE("ConsumeWindowsEventLog prints events in XML correctly", "[onTrigger]") {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), "XML");
auto logger_processor = test_plan->addProcessor("LogAttribute", "logger", Success, true);
test_plan->setProperty(logger_processor, LogAttribute::FlowFilesToLog.getName(), "0");
test_plan->setProperty(logger_processor, LogAttribute::LogPayload.getName(), "true");
test_plan->setProperty(logger_processor, LogAttribute::MaxPayloadLineLength.getName(), "1024");
{
reportEvent(APPLICATION_CHANNEL, "Event zero: this is in the past");
test_controller.runSession(test_plan);
}
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
{
reportEvent(APPLICATION_CHANNEL, "Event one");
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains(R"(<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Application"/>)"));
REQUIRE(LogTestController::getInstance().contains(R"(<EventID Qualifiers="0">14985</EventID>)"));
REQUIRE(LogTestController::getInstance().contains(R"(<Level>4</Level>)"));
REQUIRE(LogTestController::getInstance().contains(R"(<Task>0</Task>)"));
REQUIRE(LogTestController::getInstance().contains(R"(<Keywords>0x80000000000000</Keywords><TimeCreated SystemTime=")"));
// the timestamp (when the event was published) goes here
REQUIRE(LogTestController::getInstance().contains(R"("/><EventRecordID>)"));
// the ID of the event goes here (a number)
REQUIRE(LogTestController::getInstance().contains(R"(</EventRecordID>)"));
REQUIRE(LogTestController::getInstance().contains(R"(<Channel>Application</Channel><Computer>)"));
// the computer name goes here
REQUIRE(LogTestController::getInstance().contains(R"(</Computer><Security/></System><EventData><Data>Event one</Data></EventData></Event>)"));
}
}
namespace {
void batchCommitSizeTestHelper(std::size_t num_events_read, std::size_t batch_commit_size, std::size_t expected_event_count) {
TestController test_controller;
LogTestController::getInstance().setDebug<ConsumeWindowsEventLog>();
LogTestController::getInstance().setDebug<LogAttribute>();
std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
auto cwel_processor = test_plan->addProcessor("ConsumeWindowsEventLog", "cwel");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Channel.getName(), APPLICATION_CHANNEL);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::Query.getName(), QUERY);
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::OutputFormat.getName(), "XML");
test_plan->setProperty(cwel_processor, ConsumeWindowsEventLog::BatchCommitSize.getName(), std::to_string(batch_commit_size));
{
reportEvent(APPLICATION_CHANNEL, "Event zero: this is in the past");
test_controller.runSession(test_plan);
}
test_plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
auto generate_events = [](const std::size_t event_count) {
std::vector<std::string> events;
for (auto i = 0; i < event_count; ++i) {
events.push_back("Event " + std::to_string(i));
}
return events;
};
for (const auto& event : generate_events(num_events_read))
reportEvent(APPLICATION_CHANNEL, event.c_str());
test_controller.runSession(test_plan);
REQUIRE(LogTestController::getInstance().contains("processed " + std::to_string(expected_event_count) + " Events"));
}
} // namespace
TEST_CASE("ConsumeWindowsEventLog batch commit size works", "[onTrigger]") {
batchCommitSizeTestHelper(5, 1000, 5);
batchCommitSizeTestHelper(5, 5, 5);
batchCommitSizeTestHelper(5, 4, 4);
batchCommitSizeTestHelper(5, 1, 1);
batchCommitSizeTestHelper(5, 0, 5);
}