MINIFICPP-1139 - Implement better backpressure handling in ConsumeWindowsEventLog
Signed-off-by: Daniel Bakai <bakaid@apache.org>
This closes #741
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index ada6d16..e9ccd83 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -3,6 +3,7 @@
#include <direct.h>
#include "utils/file/FileUtils.h"
+#include "utils/ScopeGuard.h"
namespace org {
namespace apache {
@@ -10,53 +11,59 @@
namespace minifi {
namespace processors {
-Bookmark::Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
:logger_(logger) {
if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
return;
filePath_ += "Bookmark.txt";
- std::wstring bookmarkXml;
- if (!getBookmarkXmlFromFile(bookmarkXml)) {
+ if (!getBookmarkXmlFromFile(bookmarkXml_)) {
return;
}
- if (bookmarkXml.empty()) {
- if (!(hBookmark_ = EvtCreateBookmark(0))) {
- logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+ if (!bookmarkXml_.empty()) {
+ if (hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str())) {
+ ok_ = true;
return;
}
- hasBookmarkXml_ = false;
- } else {
- if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml.c_str()))) {
- logger_->log_error("!EvtCreateBookmark error: %d bookmarkXml_ '%s'", GetLastError(), bookmarkXml.c_str());
+ LOG_LAST_ERROR(EvtCreateBookmark);
- // BookmarkXml can be corrupted - create hBookmark_, and create empty file.
- if (!(hBookmark_ = EvtCreateBookmark(0))) {
- logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
- return;
- }
-
- hasBookmarkXml_ = false;
-
- ok_ = createEmptyBookmarkXmlFile();
-
+ bookmarkXml_.clear();
+ if (!createEmptyBookmarkXmlFile()) {
return;
}
-
- hasBookmarkXml_ = true;
}
- ok_ = true;
+ if (!(hBookmark_ = EvtCreateBookmark(0))) {
+ LOG_LAST_ERROR(EvtCreateBookmark);
+ return;
+ }
+
+ const auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
+ if (!hEventResults) {
+ LOG_LAST_ERROR(EvtQuery);
+ return;
+ }
+ const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+ if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
+ LOG_LAST_ERROR(EvtSeek);
+ return;
+ }
+
+ DWORD dwReturned{};
+ EVT_HANDLE hEvent{};
+ if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+ LOG_LAST_ERROR(EvtNext);
+ return;
+ }
+
+ ok_ = saveBookmark(hEvent);
}
Bookmark::~Bookmark() {
- if (file_.is_open()) {
- file_.close();
- }
-
if (hBookmark_) {
EvtClose(hBookmark_);
}
@@ -66,11 +73,18 @@
return ok_;
}
-bool Bookmark::hasBookmarkXml() const {
- return hasBookmarkXml_;
-}
+EVT_HANDLE Bookmark::getBookmarkHandleFromXML() {
+ if (hBookmark_) {
+ EvtClose(hBookmark_);
+ hBookmark_ = 0;
+ }
-EVT_HANDLE Bookmark::bookmarkHandle() const {
+ hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str());
+ if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str()))) {
+ LOG_LAST_ERROR(EvtCreateBookmark);
+ return 0;
+ }
+
return hBookmark_;
}
@@ -88,7 +102,7 @@
bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
if (!EvtUpdateBookmark(hBookmark_, hEvent)) {
- logger_->log_error("!EvtUpdateBookmark error: %d.", GetLastError());
+ LOG_LAST_ERROR(EvtUpdateBookmark);
return false;
}
@@ -104,7 +118,7 @@
std::vector<wchar_t> buf(bufferSize / 2 + 1);
if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, &buf[0], &bufferUsed, &propertyCount)) {
- logger_->log_error("!EvtRender error: %d.", GetLastError());
+ LOG_LAST_ERROR(EvtRender);
return false;
}
@@ -112,8 +126,8 @@
return true;
}
- else if (ERROR_SUCCESS != (status = GetLastError())) {
- logger_->log_error("!EvtRender error: %d.", GetLastError());
+ if (ERROR_SUCCESS != (status = GetLastError())) {
+ LOG_LAST_ERROR(EvtRender);
return false;
}
}
@@ -121,7 +135,9 @@
return false;
}
-void Bookmark::saveBookmarkXml(std::wstring& bookmarkXml) {
+void Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
+ bookmarkXml_ = bookmarkXml;
+
// Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
file_.seekp(std::ios::beg);
@@ -130,7 +146,6 @@
file_.flush();
}
-
bool Bookmark::createEmptyBookmarkXmlFile() {
if (file_.is_open()) {
file_.close();
@@ -205,7 +220,7 @@
// '!' should be at the end of bookmark.
auto pos = bookmarkXml.find(L'!');
if (std::wstring::npos == pos) {
- logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+ logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str());
bookmarkXml.clear();
return createEmptyBookmarkXmlFile();
}
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 4435734..f89360b 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -14,30 +14,22 @@
namespace minifi {
namespace processors {
+#define LOG_LAST_ERROR(func) logger_->log_error("!"#func" error %x", GetLastError())
+
class Bookmark
{
public:
- Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
+ Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
~Bookmark();
-
operator bool() const;
-
- bool hasBookmarkXml() const;
-
- EVT_HANDLE bookmarkHandle() const;
-
- bool saveBookmark(EVT_HANDLE hEvent);
-
+ EVT_HANDLE getBookmarkHandleFromXML();
bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
-
- void saveBookmarkXml(std::wstring& bookmarkXml);
+ void saveBookmarkXml(const std::wstring& bookmarkXml);
private:
+ bool saveBookmark(EVT_HANDLE hEvent);
bool createEmptyBookmarkXmlFile();
-
bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
-
std::string filePath(const std::string& uuid);
-
bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
private:
@@ -46,7 +38,7 @@
bool ok_{};
EVT_HANDLE hBookmark_{};
std::wfstream file_;
- bool hasBookmarkXml_{};
+ std::wstring bookmarkXml_;
};
} /* namespace processors */
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index a6832a9..cbcfbab 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -29,6 +29,7 @@
#include <iostream>
#include <memory>
#include <regex>
+#include <cinttypes>
#include "wel/MetadataWalker.h"
#include "wel/XMLString.h"
@@ -77,6 +78,7 @@
" This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)")->
build());
+// !!! This property is obsolete since now subscription is not used, but leave since it might be is used already in config.yml.
core::Property ConsumeWindowsEventLog::InactiveDurationToReconnect(
core::PropertyBuilder::createProperty("Inactive Duration To Reconnect")->
isRequired(true)->
@@ -197,17 +199,6 @@
context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
- std::string bookmarkDir;
- context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
- if (bookmarkDir.empty()) {
- logger_->log_error("State Directory is empty");
- } else {
- pBookmark_ = std::make_unique<Bookmark>(bookmarkDir, getUUIDStr(), logger_);
- if (!*pBookmark_) {
- pBookmark_.reset();
- }
- }
-
std::string header;
context->getProperty(EventHeader.getName(), header);
@@ -220,8 +211,7 @@
if (!insertHeaderName(header_names_, key, value)) {
logger_->log_error("%s is an invalid key for the header map", key);
}
- }
- else if (splitKeyAndValue.size() == 1) {
+ } else if (splitKeyAndValue.size() == 1) {
auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
if (!insertHeaderName(header_names_, key, "")) {
logger_->log_error("%s is an invalid key for the header map", key);
@@ -241,28 +231,45 @@
if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
if (!hMsobjsDll_) {
- logger_->log_error("!LoadLibrary error %x", GetLastError());
+ LOG_LAST_ERROR(LoadLibrary);
}
} else {
- logger_->log_error("!GetSystemDirectory error %x", GetLastError());
+ LOG_LAST_ERROR(GetSystemDirectory);
}
}
- if (subscriptionHandle_) {
- logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
- } else {
- sessionFactory_ = sessionFactory;
+ context->getProperty(Channel.getName(), channel_);
+ wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
- subscribe(context);
- }
-}
+ std::string query;
+ context->getProperty(Query.getName(), query);
+ wstrQuery_ = std::wstring(query.begin(), query.end());
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
- if (!subscriptionHandle_) {
- if (!subscribe(context)) {
- context->yield();
+ if (!pBookmark_) {
+ std::string bookmarkDir;
+ context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+ if (bookmarkDir.empty()) {
+ logger_->log_error("State Directory is empty");
return;
}
+ pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+ if (!*pBookmark_) {
+ pBookmark_.reset();
+ return;
+ }
+ }
+
+ context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+ logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %" PRIu64, maxBufferSize_);
+
+ provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
+}
+
+
+void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ if (!pBookmark_) {
+ context->yield();
+ return;
}
std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
@@ -271,18 +278,87 @@
return;
}
- const auto flowFileCount = processQueue(session);
-
- const auto now = GetTickCount64();
-
- if (flowFileCount > 0) {
- lastActivityTimestamp_ = now;
- }
- else if (inactiveDurationToReconnect_ > 0) {
- if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
- logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
- unsubscribe();
+ struct TimeDiff {
+ auto operator()() const {
+ return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
}
+ const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+ };
+
+ const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+ const TimeDiff timeDiff;
+ session->commit();
+ logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
+
+ pBookmark_->saveBookmarkXml(bookmarkXml);
+
+ if (session->outgoingConnectionsFull("success")) {
+ logger_->log_debug("outgoingConnectionsFull");
+ return false;
+ }
+
+ return true;
+ };
+
+ size_t eventCount = 0;
+ const TimeDiff timeDiff;
+ utils::ScopeGuard timeGuard([&]() {
+ logger_->log_debug("processed %zu Events in %" PRId64 " ms", eventCount, timeDiff());
+ });
+
+ size_t commitAndSaveBookmarkCount = 0;
+ std::wstring bookmarkXml;
+
+ const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+ if (!hEventResults) {
+ LOG_LAST_ERROR(EvtQuery);
+ return;
+ }
+ const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+ auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+ if (!hBookmark) {
+ // Unrecovarable error.
+ pBookmark_.reset();
+ return;
+ }
+
+ if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+ LOG_LAST_ERROR(EvtSeek);
+ return;
+ }
+
+ // Enumerate the events in the result set after the bookmarked event.
+ while (true) {
+ EVT_HANDLE hEvent{};
+ DWORD dwReturned{};
+ if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+ if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+ LOG_LAST_ERROR(EvtNext);
+ }
+ break;
+ }
+ const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+ EventRender eventRender;
+ std::wstring newBookmarkXml;
+ if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+ bookmarkXml = std::move(newBookmarkXml);
+ eventCount++;
+ putEventRenderFlowFileToSession(eventRender, *session);
+
+ if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+ if (!commitAndSaveBookmark(bookmarkXml)) {
+ return;
+ }
+
+ commitAndSaveBookmarkCount = eventCount;
+ }
+ }
+ }
+
+ if (eventCount > commitAndSaveBookmarkCount) {
+ commitAndSaveBookmark(bookmarkXml);
}
}
@@ -301,7 +377,6 @@
return providers_[name];
}
-
// !!! Used a non-documented approach to resolve `%%` in XML via C:\Windows\System32\MsObjs.dll.
// Links which mention this approach:
// https://social.technet.microsoft.com/Forums/Windows/en-US/340632d1-60f0-4cc5-ad6f-f8c841107d0d/translate-value-1833quot-on-impersonationlevel-and-similar-values?forum=winservergen
@@ -362,7 +437,7 @@
// Add "" to xmlPercentageItemsResolutions_ - don't need to call FormaMessage for this 'key' again.
xmlPercentageItemsResolutions_.insert({key, ""});
- logger_->log_error("!FormatMessage error: %d. '%s' is not found in msobjs.dll.", GetLastError(), key.c_str());
+ logger_->log_error("!FormatMessage error: %x. '%s' is not found in msobjs.dll.", GetLastError(), key.c_str());
}
} else {
value = it->second;
@@ -391,322 +466,122 @@
doc.traverse(treeWalker);
}
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
DWORD size = 0;
DWORD used = 0;
DWORD propertyCount = 0;
- if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
- if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
- if (used > maxBufferSize_) {
- logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
- return;
- }
-
- size = used;
- std::vector<wchar_t> buf(size / 2 + 1);
- if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
- logger_->log_error("!EvtRender error: %d.", GetLastError());
- return;
- }
-
- std::string xml = wel::to_string(&buf[0]);
-
- pugi::xml_document doc;
- pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
- if (!result) {
- logger_->log_error("Invalid XML produced");
- return;
- }
- // this is a well known path.
- std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
- wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
- // resolve the event metadata
- doc.traverse(walker);
-
- EventRender renderedData;
-
- if (writePlainText_) {
- auto handler = getEventLogHandler(providerName);
- auto message = handler.getEventMessage(hEvent);
-
- if (!message.empty()) {
-
- for (const auto &mapEntry : walker.getIdentifiers()) {
- // replace the identifiers with their translated strings.
- utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
- }
- wel::WindowsEventLogHeader log_header(header_names_);
- // set the delimiter
- log_header.setDelimiter(header_delimiter_);
- // render the header.
- renderedData.rendered_text_ = log_header.getEventHeader(&walker);
- renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
- renderedData.rendered_text_ += message;
- }
- }
-
- if (writeXML_) {
- substituteXMLPercentageItems(doc);
-
- if (resolve_as_attributes_) {
- renderedData.matched_fields_ = walker.getFieldValues();
- }
-
- wel::XmlString writer;
- doc.print(writer, "", pugi::format_raw); // no indentation or formatting
- xml = writer.xml_;
-
- renderedData.text_ = std::move(xml);
- }
-
- if (pBookmark_) {
- std::wstring bookmarkXml;
- if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
- renderedData.bookmarkXml_ = bookmarkXml;
- }
- }
-
- listRenderedData_.enqueue(std::move(renderedData));
- }
- }
-}
-
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
- if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
- logger_->log_error("!EvtSeek error %d.", GetLastError());
+ EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+ if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+ LOG_LAST_ERROR(EvtRender);
return false;
}
- // Enumerate the events in the result set after the bookmarked event.
- while (true) {
- EVT_HANDLE hEvent{};
- DWORD dwReturned{};
- if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
- DWORD status = ERROR_SUCCESS;
- if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
- logger_->log_error("!EvtNext error %d.", status);
+ if (used > maxBufferSize_) {
+ logger_->log_error("Dropping event because it couldn't be rendered within %" PRIu64 " bytes.", maxBufferSize_);
+ return false;
+ }
+
+ size = used;
+ std::vector<wchar_t> buf(size / 2 + 1);
+ if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+ LOG_LAST_ERROR(EvtRender);
+ return false;
+ }
+
+ std::string xml = wel::to_string(&buf[0]);
+
+ pugi::xml_document doc;
+ pugi::xml_parse_result result = doc.load_string(xml.c_str());
+
+ if (!result) {
+ logger_->log_error("Invalid XML produced");
+ return false;
+ }
+
+ // this is a well known path.
+ std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+ wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
+
+ // resolve the event metadata
+ doc.traverse(walker);
+
+ if (writePlainText_) {
+ auto handler = getEventLogHandler(providerName);
+ auto message = handler.getEventMessage(hEvent);
+
+ if (!message.empty()) {
+
+ for (const auto &mapEntry : walker.getIdentifiers()) {
+ // replace the identifiers with their translated strings.
+ utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
}
- break;
+ wel::WindowsEventLogHeader log_header(header_names_);
+ // set the delimiter
+ log_header.setDelimiter(header_delimiter_);
+ // render the header.
+ eventRender.rendered_text_ = log_header.getEventHeader(&walker);
+ eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
+ eventRender.rendered_text_ += message;
+ }
+ }
+
+ if (writeXML_) {
+ substituteXMLPercentageItems(doc);
+
+ if (resolve_as_attributes_) {
+ eventRender.matched_fields_ = walker.getFieldValues();
}
- processEvent(hEvent);
+ wel::XmlString writer;
+ doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+ xml = writer.xml_;
- EvtClose(hEvent);
+ eventRender.text_ = std::move(xml);
}
return true;
}
-
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
- context->getProperty(Channel.getName(), channel_);
- context->getProperty(Query.getName(), query_);
-
- context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
- logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
-
- provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
- std::string strInactiveDurationToReconnect;
- context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
-
- // Get 'inactiveDurationToReconnect_'.
- core::TimeUnit unit;
- if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_, unit) &&
- core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_)) {
- logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
- }
-
- if (!pBookmark_) {
- logger_->log_error("!pBookmark_");
- return false;
- }
-
- auto channel = std::wstring(channel_.begin(), channel_.end());
- auto query = std::wstring(query_.begin(), query_.end());
-
- do {
- auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
- if (!hEventResults) {
- logger_->log_error("!EvtQuery error: %d.", GetLastError());
- // Consider it as a serious error.
- return false;
- }
- const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
-
- if (pBookmark_->hasBookmarkXml()) {
- if (!processEventsAfterBookmark(hEventResults, channel, query)) {
- break;
- }
- } else {
- // Seek to the last event in the hEventResults.
- if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
- logger_->log_error("!EvtSeek error: %d.", GetLastError());
- break;
- }
-
- DWORD dwReturned{};
- EVT_HANDLE hEvent{};
- if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
- logger_->log_error("!EvtNext error: %d.", GetLastError());
- break;
- }
-
- pBookmark_->saveBookmark(hEvent);
- }
- } while (false);
-
- subscriptionHandle_ = EvtSubscribe(
- NULL,
- NULL,
- channel.c_str(),
- query.c_str(),
- NULL,
- this,
- [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
- {
- auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
-
- auto& logger = pConsumeWindowsEventLog->logger_;
-
- if (action == EvtSubscribeActionError) {
- if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
- logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
- } else {
- logger->log_error("Received the following Win32 error: %x", hEvent);
- }
- } else if (action == EvtSubscribeActionDeliver) {
- pConsumeWindowsEventLog->processEvent(hEvent);
- }
-
- return 0UL;
- },
- EvtSubscribeToFutureEvents | EvtSubscribeStrict);
-
- if (!subscriptionHandle_) {
- logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
- return false;
- }
-
- lastActivityTimestamp_ = GetTickCount64();
-
- return true;
-}
-
-void ConsumeWindowsEventLog::unsubscribe()
+void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session)
{
- if (subscriptionHandle_) {
- EvtClose(subscriptionHandle_);
- subscriptionHandle_ = 0;
- }
-}
-
-int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession> &session)
-{
- struct WriteCallback: public OutputStreamCallback {
+ struct WriteCallback : public OutputStreamCallback {
WriteCallback(const std::string& str)
- : data_(str.c_str()), size_(str.length()) {
+ : str_(str) {
}
int64_t process(std::shared_ptr<io::BaseStream> stream) {
- return stream->writeData((uint8_t*)data_, size_);
+ return stream->writeData(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), str_.size());
}
- std::string str_;
- const char * data_;
- const size_t size_;
+ const std::string& str_;
};
- int flowFileCount = 0;
+ if (writeXML_) {
+ auto flowFile = session.create();
- auto before_time = std::chrono::high_resolution_clock::now();
- utils::ScopeGuard timeGuard([&](){
- logger_->log_debug("processQueue processed %d Events in %llu ms",
- flowFileCount,
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
- });
-
- bool commitAndSaveBookmark = false;
-
- EventRender evt;
- while (listRenderedData_.try_dequeue(evt)) {
- commitAndSaveBookmark = true;
-
- if (writeXML_) {
- auto flowFile = session->create();
-
- session->write(flowFile, &WriteCallback(evt.text_));
- for (const auto &fieldMapping : evt.matched_fields_) {
- if (!fieldMapping.second.empty()) {
- session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
- }
+ session.write(flowFile, &WriteCallback(eventRender.text_));
+ for (const auto &fieldMapping : eventRender.matched_fields_) {
+ if (!fieldMapping.second.empty()) {
+ session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
- session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
- session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
- 0);
- session->transfer(flowFile, Success);
}
-
- if (writePlainText_) {
- auto flowFile = session->create();
-
- session->write(flowFile, &WriteCallback(evt.rendered_text_));
- session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
- session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
- 0);
- session->transfer(flowFile, Success);
- }
-
- flowFileCount++;
-
- if (batch_commit_size_ != 0U && (flowFileCount % batch_commit_size_ == 0)) {
- auto before_commit = std::chrono::high_resolution_clock::now();
- session->commit();
- logger_->log_debug("processQueue commit took %llu ms",
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
-
- if (pBookmark_) {
- pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
- }
- if(session->outgoingConnectionsFull("success")) {
- return flowFileCount; // Enough flowfiles there
- }
-
- commitAndSaveBookmark = false;
- }
+ session.putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
+ session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+ session.transfer(flowFile, Success);
}
- if (commitAndSaveBookmark) {
- session->commit();
+ if (writePlainText_) {
+ auto flowFile = session.create();
- if (pBookmark_) {
- pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
- }
+ session.write(flowFile, &WriteCallback(eventRender.rendered_text_));
+ session.putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
+ session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+ session.transfer(flowFile, Success);
}
-
- return flowFileCount;
}
void ConsumeWindowsEventLog::notifyStop()
{
- unsubscribe();
-
- if (listRenderedData_.size_approx() != 0) {
- auto session = sessionFactory_->createSession();
- if (session) {
- logger_->log_info("Finishing processing leftover events");
-
- processQueue(session);
- } else {
- logger_->log_error(
- "Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. "
- "Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, "
- "receiving events and stopping before the onTrigger happens. The messages in the internal queue cannot finish processing until "
- "the processor is triggered to run.");
- }
- }
}
void ConsumeWindowsEventLog::LogWindowsError()
@@ -723,7 +598,7 @@
(LPTSTR)&lpMsg,
0, NULL);
- logger_->log_error("Error %d: %s\n", (int)error_id, (char *)lpMsg);
+ logger_->log_error("Error %x: %s\n", (int)error_id, (char *)lpMsg);
LocalFree(lpMsg);
}
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 2314611..4085b76 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -46,7 +46,6 @@
std::map<std::string, std::string> matched_fields_;
std::string text_;
std::string rendered_text_;
- std::wstring bookmarkXml_;
};
class Bookmark;
@@ -101,14 +100,11 @@
protected:
- bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
- void unsubscribe();
- int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+ void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session);
wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
void LogWindowsError();
- void processEvent(EVT_HANDLE eventHandle);
- bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+ bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
void substituteXMLPercentageItems(pugi::xml_document& doc);
static constexpr const char * const XML = "XML";
@@ -120,19 +116,16 @@
wel::METADATA_NAMES header_names_;
std::string header_delimiter_;
std::string channel_;
- std::string query_;
+ std::wstring wstrChannel_;
+ std::wstring wstrQuery_;
std::shared_ptr<logging::Logger> logger_;
std::string regex_;
bool resolve_as_attributes_;
bool apply_identifier_function_;
- moodycamel::ConcurrentQueue<EventRender> listRenderedData_;
std::string provenanceUri_;
std::string computerName_;
- int64_t inactiveDurationToReconnect_{};
- EVT_HANDLE subscriptionHandle_{};
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
- std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
uint64_t batch_commit_size_;