blob: f524f1d9b8de464d09876f76732dd91e8f761503 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeWindowsEventLog.h"
#include <cstdio>
#include <vector>
#include <tuple>
#include <utility>
#include <map>
#include <string>
#include <memory>
#include "wel/Bookmark.h"
#include "wel/LookupCacher.h"
#include "wel/MetadataWalker.h"
#include "wel/StringSplitter.h"
#include "wel/XMLString.h"
#include "wel/JSONUtils.h"
#include "wel/WindowsError.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/Deleters.h"
#include "core/logging/LoggerFactory.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/RegexUtils.h"
#include "utils/StringUtils.h"
#include "utils/UnicodeConversion.h"
#include "utils/ProcessorConfigUtils.h"
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "ole32.lib")
using namespace std::literals::chrono_literals;
namespace {
auto createTimer() {
return [start_time = std::chrono::steady_clock::now()] {
return std::chrono::steady_clock::now() - start_time;
};
}
} // namespace
namespace org::apache::nifi::minifi::wel {
std::function<bool(std::string_view)> parseSidMatcher(const std::optional<std::string>& sid_matcher) {
if (!sid_matcher || sid_matcher->empty()) {
return [](std::string_view){ return false; };
}
if (std::smatch match; utils::regexMatch(*sid_matcher, match, utils::Regex{R"_(\.\*(\w+))_"})) {
std::string suffix = match[1];
return [suffix](std::string_view field_name) { return utils::string::endsWith(field_name, suffix); };
}
utils::Regex sid_matcher_regex{*sid_matcher};
return [sid_matcher_regex](std::string_view field_name) { return utils::regexMatch(field_name, sid_matcher_regex); };
}
} // namespace org::apache::nifi::minifi::wel
namespace org::apache::nifi::minifi::processors {
ConsumeWindowsEventLog::ConsumeWindowsEventLog(core::ProcessorMetadata metadata)
: core::ProcessorImpl{std::move(metadata)} {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
computerName_ = buff;
} else {
logger_->log_error("GetComputerName failed due to {}", wel::getLastError());
}
}
void ConsumeWindowsEventLog::notifyStop() {
std::lock_guard<std::mutex> lock(on_trigger_mutex_);
logger_->log_trace("start notifyStop");
bookmark_.reset();
if (hMsobjsDll_) {
if (FreeLibrary(hMsobjsDll_)) {
hMsobjsDll_ = nullptr;
} else {
logger_->log_error("FreeLibrary failed due to {}", wel::getLastError());
}
}
logger_->log_trace("finish notifyStop");
}
ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
if (hMsobjsDll_) {
FreeLibrary(hMsobjsDll_);
}
}
void ConsumeWindowsEventLog::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
wel::HeaderNames ConsumeWindowsEventLog::createHeaderNames(const std::optional<std::string>& event_header_property) const {
if (!event_header_property) { return {}; }
wel::HeaderNames header_names;
wel::splitCommaSeparatedKeyValuePairs(*event_header_property, [this, &header_names](std::string_view key, std::string_view value) {
if (auto metadata = magic_enum::enum_cast<wel::Metadata>(key)) {
header_names.emplace_back(*metadata, value);
} else {
logger_->log_error("{} is an invalid key for the header map", key);
}
});
return header_names;
}
void ConsumeWindowsEventLog::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
state_manager_ = context.getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
resolve_as_attributes_ = utils::parseBoolProperty(context, ResolveAsAttributes);
apply_identifier_function_ = utils::parseBoolProperty(context, IdentifierFunction);
header_delimiter_ = utils::parseOptionalProperty(context, EventHeaderDelimiter);
batch_commit_size_ = utils::parseU64Property(context, BatchCommitSize);
header_names_ = createHeaderNames(utils::parseOptionalProperty(context, EventHeader));
sid_matcher_ = wel::parseSidMatcher(utils::parseOptionalProperty(context, IdentifierMatcher));
output_format_ = utils::parseEnumProperty<wel::OutputFormat>(context, OutputFormatProperty);
json_format_ = utils::parseEnumProperty<wel::JsonFormat>(context, JsonFormatProperty);
if (output_format_ != wel::OutputFormat::Plaintext && !hMsobjsDll_) {
char systemDir[MAX_PATH];
if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
if (!hMsobjsDll_) {
logger_->log_error("LoadLibrary failed due to {}", wel::getLastError());
}
} else {
logger_->log_error("GetSystemDirectory failed due to {}", wel::getLastError());
}
}
path_ = wel::EventPath{utils::parseProperty(context, Channel)};
if (path_.kind() == wel::EventPath::Kind::FILE) {
logger_->log_debug("Using saved log file as log source");
} else {
logger_->log_debug("Using channel as log source");
}
std::string query = context.getProperty(Query).value_or("");
wstr_query_ = utils::to_wstring(query);
if (!bookmark_) {
bookmark_ = createBookmark(context);
}
max_buffer_size_ = utils::parseDataSizeProperty(context, MaxBufferSize);
logger_->log_debug("ConsumeWindowsEventLog: MaxBufferSize {}", max_buffer_size_);
cache_sid_lookups_ = utils::parseBoolProperty(context, CacheSidLookups);
logger_->log_debug("ConsumeWindowsEventLog: will{} cache SID to name lookups", cache_sid_lookups_ ? "" : " not");
provenanceUri_ = "winlog://" + computerName_ + "/" + path_.str() + "?" + query;
logger_->log_trace("Successfully configured CWEL");
}
std::unique_ptr<wel::Bookmark> ConsumeWindowsEventLog::createBookmark(const core::ProcessContext& context) const {
std::string bookmark_dir = context.getProperty(BookmarkRootDirectory).value_or("");
if (bookmark_dir.empty()) {
logger_->log_error("State Directory is empty");
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "State Directory is empty");
}
bool process_old_events = utils::parseBoolProperty(context, ProcessOldEvents);
auto bookmark = std::make_unique<wel::Bookmark>(path_, wstr_query_, bookmark_dir, getUUID(), process_old_events, state_manager_, logger_);
if (!bookmark->isValid()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bookmark is empty");
}
return bookmark;
}
bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, core::ProcessContext& context, core::ProcessSession& session) {
{
const auto time_diff = createTimer();
session.commit();
context.getStateManager()->beginTransaction();
logger_->log_debug("ConsumeWindowsEventLog: commit took {}", time_diff());
}
if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
logger_->log_error("Failed to save bookmark xml");
}
if (session.outgoingConnectionsFull("success")) {
logger_->log_debug("Outgoing success connection is full");
return false;
}
return true;
}
std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(core::ProcessSession& session, EVT_HANDLE event_query_results) {
static constexpr DWORD timeout_milliseconds = 500;
size_t processed_event_count = 0;
std::wstring bookmark_xml;
logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
EVT_HANDLE next_event_handle{};
DWORD handles_set_count{};
if (!EvtNext(event_query_results, 1, &next_event_handle, timeout_milliseconds, 0, &handles_set_count)) {
if (ERROR_NO_MORE_ITEMS != GetLastError()) {
logger_->log_error("Failed to get next event due to {}", wel::getLastError());
continue;
/* According to MS this iteration should only end when the return value is false AND
the error code is NO_MORE_ITEMS. See the following page for further details:
https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
}
break;
}
wel::unique_evt_handle next_event{next_event_handle};
logger_->log_trace("Successfully got the next event, performing event rendering");
auto processed_event = processEvent(next_event.get());
if (!processed_event) {
logger_->log_error("error processing event: {}", processed_event.error());
continue;
}
auto new_bookmark_xml = bookmark_->getNewBookmarkXml(next_event.get());
if (!new_bookmark_xml) {
logger_->log_error("error getting the new bookmark: {}", new_bookmark_xml.error());
continue;
}
bookmark_xml = std::move(*new_bookmark_xml);
processed_event_count++;
createAndCommitFlowFile(*processed_event, session);
}
logger_->log_trace("Finished enumerating events.");
return std::make_tuple(processed_event_count, bookmark_xml);
}
void ConsumeWindowsEventLog::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
if (!lock.owns_lock()) {
logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
return;
}
if (!bookmark_) {
logger_->log_debug("bookmark_ is null");
context.yield();
return;
}
logger_->log_trace("CWEL onTrigger");
size_t processed_event_count = 0;
const auto time_diff = createTimer();
const auto timeGuard = gsl::finally([&]() {
logger_->log_debug("processed {} Events in {}", processed_event_count, time_diff());
});
wel::unique_evt_handle event_query_results{EvtQuery(nullptr, path_.wstr().c_str(), wstr_query_.c_str(), path_.getQueryFlags())};
if (!event_query_results) {
logger_->log_error("EvtQuery failed due to {}", wel::getLastError());
context.yield();
return;
}
logger_->log_trace("Retrieved results in Channel: {} with Query: {}", utils::to_string(path_.wstr()), utils::to_string(wstr_query_));
auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
if (!bookmark_handle) {
logger_->log_error("bookmark_handle is null, unrecoverable error!");
bookmark_.reset();
context.yield();
return;
}
if (!EvtSeek(event_query_results.get(), 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
logger_->log_error("EvtSeek failed due to {}", wel::getLastError());
context.yield();
return;
}
refreshTimeZoneData();
std::wstring bookmark_xml;
std::tie(processed_event_count, bookmark_xml) = processEventLogs(session, event_query_results.get());
if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, context, session)) {
context.yield();
return;
}
}
wel::WindowsEventLogProvider& ConsumeWindowsEventLog::getEventLogProvider(const std::string& name) {
logger_->log_trace("Getting Event Log Provider corresponding to {}", name);
auto it = providers_.find(name);
if (it != std::end(providers_)) {
logger_->log_trace("Found cached event log provider");
return it->second;
}
std::wstring temp_wstring = utils::to_wstring(name);
LPCWSTR widechar = temp_wstring.c_str();
auto opened_publisher_metadata_provider = EvtOpenPublisherMetadata(nullptr, widechar, nullptr, 0, 0);
if (!opened_publisher_metadata_provider) {
logger_->log_warn("EvtOpenPublisherMetadata failed due to {}", wel::getLastError());
}
providers_.emplace(name, opened_publisher_metadata_provider);
logger_->log_info("Created new Windows event log provider for '{}'. Number of cached providers: {}", name, providers_.size());
return providers_[name];
}
// !!! Used a non-documented approach to resolve `%%` in XML via C:\Windows\System32\MsObjs.dll.
// Links which mention this approach:
// https://social.technet.microsoft.com/Forums/Windows/en-US/340632d1-60f0-4cc5-ad6f-f8c841107d0d/translate-value-1833quot-on-impersonationlevel-and-similar-values?forum=winservergen
// https://github.com/libyal/libevtx/blob/master/documentation/Windows%20XML%20Event%20Log%20(EVTX).asciidoc
// https://stackoverflow.com/questions/33498244/marshaling-a-message-table-resource
//
// Traverse xml and check each node, if it starts with '%%' and contains only digits, use it as key to lookup value in C:\Windows\System32\MsObjs.dll.
void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& doc) {
if (!hMsobjsDll_) {
return;
}
struct TreeWalker: public pugi::xml_tree_walker {
TreeWalker(HMODULE hMsobjsDll, std::unordered_map<std::string, std::string>& xmlPercentageItemsResolutions, std::shared_ptr<core::logging::Logger> logger)
: hMsobjsDll_(hMsobjsDll), xmlPercentageItemsResolutions_(xmlPercentageItemsResolutions), logger_(std::move(logger)) {
}
bool for_each(pugi::xml_node& node) override {
static const std::string percentages = "%%";
bool percentagesReplaced = false;
std::string nodeText = node.text().get();
for (size_t numberPos = 0; std::string::npos != (numberPos = nodeText.find(percentages, numberPos));) {
numberPos += percentages.size();
DWORD number{};
try {
// Assumption - first character is not '0', otherwise not all digits will be replaced by 'value'.
number = std::stoul(&nodeText[numberPos]);
} catch (const std::invalid_argument&) {
continue;
}
const std::string key = std::to_string(number);
std::string value;
const auto it = xmlPercentageItemsResolutions_.find(key);
if (it == xmlPercentageItemsResolutions_.end()) {
LPTSTR pBuffer{};
if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_IGNORE_INSERTS,
hMsobjsDll_,
number,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)&pBuffer,
1024, nullptr)) {
value = pBuffer;
LocalFree(pBuffer);
value = utils::string::trimRight(value);
xmlPercentageItemsResolutions_.insert({key, value});
} else {
// Add "" to xmlPercentageItemsResolutions_ - don't need to call FormaMessage for this 'key' again.
xmlPercentageItemsResolutions_.insert({key, ""});
logger_->log_error("FormatMessage failed due to {}. '{}' is not found in msobjs.dll.", wel::getLastError(), key);
}
} else {
value = it->second;
}
if (!value.empty()) {
nodeText.replace(numberPos - percentages.size(), key.size() + percentages.size(), value);
percentagesReplaced = true;
}
}
if (percentagesReplaced) {
node.text().set(nodeText.c_str());
}
return true;
}
private:
HMODULE hMsobjsDll_;
std::unordered_map<std::string, std::string>& xmlPercentageItemsResolutions_;
std::shared_ptr<core::logging::Logger> logger_;
} treeWalker(hMsobjsDll_, xmlPercentageItemsResolutions_, logger_);
doc.traverse(treeWalker);
}
nonstd::expected<std::string, std::string> ConsumeWindowsEventLog::renderEventAsXml(EVT_HANDLE event_handle) {
logger_->log_trace("Rendering an event");
WCHAR stackBuffer[4096];
DWORD size = sizeof(stackBuffer);
using Deleter = utils::StackAwareDeleter<WCHAR, utils::FreeDeleter>;
std::unique_ptr<WCHAR, Deleter> buf{stackBuffer, Deleter{stackBuffer}};
DWORD used = 0;
DWORD propertyCount = 0;
if (!EvtRender(nullptr, event_handle, EvtRenderEventXml, size, buf.get(), &used, &propertyCount)) {
if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
return nonstd::make_unexpected(fmt::format("EvtRender failed due to {}", wel::getLastError()));
}
if (used > max_buffer_size_) {
return nonstd::make_unexpected(fmt::format("Dropping event because it couldn't be rendered within {} bytes.", max_buffer_size_));
}
size = used;
buf.reset((LPWSTR) malloc(size));
if (!buf) {
return nonstd::make_unexpected("malloc failed");
}
if (!EvtRender(nullptr, event_handle, EvtRenderEventXml, size, buf.get(), &used, &propertyCount)) {
return nonstd::make_unexpected(fmt::format("EvtRender failed due to {}", wel::getLastError()));
}
}
logger_->log_trace("Event rendered with size {}", used);
return utils::to_string(std::wstring{buf.get()});
}
nonstd::expected<wel::ProcessedEvent, std::string> ConsumeWindowsEventLog::processEvent(EVT_HANDLE event_handle) {
auto event_as_xml = renderEventAsXml(event_handle);
if (!event_as_xml)
return nonstd::make_unexpected(event_as_xml.error());
pugi::xml_document doc;
if (!doc.load_string(event_as_xml->c_str()))
return nonstd::make_unexpected("Invalid XML produced");
wel::ProcessedEvent result;
// this is a well known path.
std::string provider_name = doc.child("Event").child("System").child("Provider").attribute("Name").value();
wel::WindowsEventLogMetadataImpl metadata{getEventLogProvider(provider_name), event_handle};
wel::MetadataWalker walker{metadata, path_.str(), !resolve_as_attributes_, apply_identifier_function_, sid_matcher_, userIdToUsernameFunction()};
// resolve the event metadata
doc.traverse(walker);
logger_->log_trace("Finish doc traversing, performing writing...");
if (output_format_ == wel::OutputFormat::Plaintext || output_format_ == wel::OutputFormat::Both) {
logger_->log_trace("Writing event in plain text");
auto& provider = getEventLogProvider(provider_name);
auto event_message = provider.getEventMessage(event_handle);
if (event_message) {
for (const auto& map_entry : walker.getIdentifiers()) {
if (map_entry.first.empty() || map_entry.second.empty()) {
continue;
}
utils::string::replaceAll(*event_message, map_entry.first, map_entry.second);
}
}
std::string_view payload_name = event_message ? "Message" : "Error";
wel::WindowsEventLogHeader log_header(header_names_, header_delimiter_, payload_name.size());
result.plaintext = log_header.getEventHeader([&walker](wel::Metadata metadata) { return walker.getMetadata(metadata); });
result.plaintext += payload_name;
result.plaintext += log_header.getDelimiterFor(payload_name.size());
result.plaintext += event_message.has_value() ? *event_message : event_message.error().message();
logger_->log_trace("Finish writing in plain text");
}
if (output_format_ != wel::OutputFormat::Plaintext) {
substituteXMLPercentageItems(doc);
logger_->log_trace("Finish substituting %% in XML");
}
if (resolve_as_attributes_) {
result.matched_fields = walker.getFieldValues();
}
if (output_format_ == wel::OutputFormat::XML || output_format_ == wel::OutputFormat::Both) {
logger_->log_trace("Writing event in XML");
wel::XmlString writer;
doc.print(writer, "", pugi::format_raw); // no indentation or formatting
event_as_xml = writer.xml_;
result.xml = std::move(*event_as_xml);
logger_->log_trace("Finish writing in XML");
}
if (output_format_ == wel::OutputFormat::JSON) {
switch (json_format_) {
case wel::JsonFormat::Raw: {
logger_->log_trace("Writing event in raw JSON");
result.json = wel::jsonToString(wel::toRawJSON(doc));
logger_->log_trace("Finish writing in raw JSON");
break;
}
case wel::JsonFormat::Simple: {
logger_->log_trace("Writing event in simple JSON");
result.json = wel::jsonToString(wel::toSimpleJSON(doc));
logger_->log_trace("Finish writing in simple JSON");
break;
}
case wel::JsonFormat::Flattened: {
logger_->log_trace("Writing event in flattened JSON");
result.json = wel::jsonToString(wel::toFlattenedJSON(doc));
logger_->log_trace("Finish writing in flattened JSON");
break;
}
default: {
gsl_Assert(false);
}
}
}
return result;
}
void ConsumeWindowsEventLog::refreshTimeZoneData() {
DYNAMIC_TIME_ZONE_INFORMATION tzinfo;
auto ret = GetDynamicTimeZoneInformation(&tzinfo);
std::wstring tzstr;
long tzbias = 0; // NOLINT long comes from WINDOWS API
bool dst = false;
switch (ret) {
case TIME_ZONE_ID_INVALID:
logger_->log_error("Failed to get timezone information!");
return; // Don't update members in case we cannot get data
case TIME_ZONE_ID_UNKNOWN:
tzstr = tzinfo.StandardName;
tzbias = tzinfo.Bias;
break;
case TIME_ZONE_ID_DAYLIGHT:
tzstr = tzinfo.DaylightName;
dst = true;
[[fallthrough]];
case TIME_ZONE_ID_STANDARD:
tzstr = tzstr.empty() ? tzinfo.StandardName : tzstr; // Use standard timezome name in case there is no daylight name or in case it's not DST
tzbias = tzinfo.Bias + (dst ? tzinfo.DaylightBias : tzinfo.StandardBias);
break;
}
timezone_name_ = utils::to_string(tzstr);
tzbias *= -1; // WinApi specifies UTC = localtime + bias, but we need offset from UTC
timezone_offset_ = fmt::format("{}{:02}:{:02}", tzbias >= 0 ? '+' : '-', std::abs(tzbias) / 60, std::abs(tzbias) % 60);
logger_->log_trace("Timezone name: {}, offset: {}", timezone_name_, timezone_offset_);
}
void ConsumeWindowsEventLog::createAndCommitFlowFile(const wel::ProcessedEvent& processed_event, core::ProcessSession& session) const {
auto commit_flow_file = [&] (const std::string& content, const std::string& mimeType) {
auto flow_file = session.create();
for (const auto& [key, value] : processed_event.matched_fields) {
if (!value.empty()) {
session.putAttribute(*flow_file, key, value);
}
}
session.writeBuffer(flow_file, content);
session.putAttribute(*flow_file, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
session.putAttribute(*flow_file, "timezone.name", timezone_name_);
session.putAttribute(*flow_file, "timezone.offset", timezone_offset_);
session.getProvenanceReporter()->receive(*flow_file, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
session.transfer(flow_file, Success);
};
if (output_format_ == wel::OutputFormat::XML || output_format_ == wel::OutputFormat::Both) {
logger_->log_trace("Writing rendered XML to a flow file");
commit_flow_file(processed_event.xml, "application/xml");
}
if (output_format_ == wel::OutputFormat::Plaintext || output_format_ == wel::OutputFormat::Both) {
logger_->log_trace("Writing rendered plain text to a flow file");
commit_flow_file(processed_event.plaintext, "text/plain");
}
if (output_format_ == wel::OutputFormat::JSON) {
logger_->log_trace("Writing rendered {} JSON to a flow file", magic_enum::enum_name(json_format_));
commit_flow_file(processed_event.json, "application/json");
}
}
std::function<std::string(const std::string&)> ConsumeWindowsEventLog::userIdToUsernameFunction() const {
static constexpr auto lookup = &utils::OsUtils::userIdToUsername;
if (cache_sid_lookups_) {
static auto cached_lookup = wel::LookupCacher{lookup};
return std::ref(cached_lookup);
} else {
return lookup;
}
}
REGISTER_RESOURCE(ConsumeWindowsEventLog, Processor);
} // namespace org::apache::nifi::minifi::processors