MINIFICPP-1086 - ConsumeWindowsEventLog fixes
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 75971fa..77f0f90 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -34,6 +34,7 @@
#include "wel/XMLString.h"
#include "wel/UnicodeConversion.h"
+#include "utils/ScopeGuard.h"
#include "io/DataStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -131,10 +132,17 @@
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
build());
+core::Property ConsumeWindowsEventLog::BatchCommitSize(
+ core::PropertyBuilder::createProperty("Batch Commit Size")->
+ isRequired(false)->
+ withDefaultValue<uint64_t>(1000U)->
+ withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")->
+ build());
+
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
- : core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false) {
+ : core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
@@ -153,7 +161,7 @@
void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
- setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat});
+ setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize});
//! Set the supported relationships
setSupportedRelationships({Success});
@@ -175,6 +183,7 @@
context->getProperty(ResolveAsAttributes.getName(), resolve_as_attributes_);
context->getProperty(IdentifierFunction.getName(), apply_identifier_function_);
context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
+ context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
std::string header;
context->getProperty(EventHeader.getName(), header);
@@ -407,6 +416,13 @@
int flowFileCount = 0;
+ auto before_time = std::chrono::high_resolution_clock::now();
+ utils::ScopeGuard timeGuard([&](){
+ logger_->log_debug("processQueue processed %d Events in %llu ms",
+ flowFileCount,
+ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
+ });
+
EventRender evt;
while (listRenderedData_.try_dequeue(evt)) {
if (writeXML_) {
@@ -435,6 +451,13 @@
}
flowFileCount++;
+
+ if (batch_commit_size_ != 0U && (flowFileCount % batch_commit_size_ == 0)) {
+ auto before_commit = std::chrono::high_resolution_clock::now();
+ session->commit();
+ logger_->log_debug("processQueue commit took %llu ms",
+ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
+ }
}
return flowFileCount;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index d03d0a7..ea8bef1 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -77,6 +77,7 @@
static core::Property EventHeaderDelimiter;
static core::Property EventHeader;
static core::Property OutputFormat;
+ static core::Property BatchCommitSize;
//! Supported Relationships
static core::Relationship Success;
@@ -132,6 +133,8 @@
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
+ uint64_t batch_commit_size_;
+
bool writeXML_;
bool writePlainText_;
};
diff --git a/extensions/windows-event-log/wel/WindowsEventLog.cpp b/extensions/windows-event-log/wel/WindowsEventLog.cpp
index 7927f3d..3e67852 100644
--- a/extensions/windows-event-log/wel/WindowsEventLog.cpp
+++ b/extensions/windows-event-log/wel/WindowsEventLog.cpp
@@ -19,6 +19,7 @@
#include "WindowsEventLog.h"
#include "UnicodeConversion.h"
#include "utils/Deleters.h"
+#include "utils/ScopeGuard.h"
#include <algorithm>
namespace org {
@@ -36,6 +37,12 @@
std::unique_ptr< EVT_VARIANT, utils::FreeDeleter> rendered_values;
auto context = EvtCreateRenderContext(0, NULL, EvtRenderContextSystem);
+ if (context == NULL) {
+ return;
+ }
+ utils::ScopeGuard contextGuard([&context](){
+ EvtClose(context);
+ });
if (!EvtRender(context, event_ptr_, EvtRenderEventValues, dwBufferSize, nullptr, &dwBufferUsed, &dwPropertyCount))
{
if (ERROR_INSUFFICIENT_BUFFER == (status = GetLastError()))