MINIFICPP-1139 - Implement better backpressure handling in ConsumeWindowsEventLog

Signed-off-by: Daniel Bakai <bakaid@apache.org>

This closes #741
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index ada6d16..e9ccd83 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -3,6 +3,7 @@
 #include <direct.h>
 
 #include "utils/file/FileUtils.h"
+#include "utils/ScopeGuard.h"
 
 namespace org {
 namespace apache {
@@ -10,53 +11,59 @@
 namespace minifi {
 namespace processors {
 
-Bookmark::Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
   :logger_(logger) {
   if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
     return;
 
   filePath_ += "Bookmark.txt";
 
-  std::wstring bookmarkXml;
-  if (!getBookmarkXmlFromFile(bookmarkXml)) {
+  if (!getBookmarkXmlFromFile(bookmarkXml_)) {
     return;
   }
 
-  if (bookmarkXml.empty()) {
-    if (!(hBookmark_ = EvtCreateBookmark(0))) {
-      logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
+  if (!bookmarkXml_.empty()) {
+    if (hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str())) {
+      ok_ = true;
       return;
     }
 
-    hasBookmarkXml_ = false;
-  } else {
-    if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml.c_str()))) {
-      logger_->log_error("!EvtCreateBookmark error: %d bookmarkXml_ '%s'", GetLastError(), bookmarkXml.c_str());
+    LOG_LAST_ERROR(EvtCreateBookmark);
 
-      // BookmarkXml can be corrupted - create hBookmark_, and create empty file. 
-      if (!(hBookmark_ = EvtCreateBookmark(0))) {
-        logger_->log_error("!EvtCreateBookmark error: %d", GetLastError());
-        return;
-      }
-
-      hasBookmarkXml_ = false;
-
-      ok_ = createEmptyBookmarkXmlFile();
-
+    bookmarkXml_.clear();
+    if (!createEmptyBookmarkXmlFile()) {
       return;
     }
-
-    hasBookmarkXml_ = true;
   }
 
-  ok_ = true;
+  if (!(hBookmark_ = EvtCreateBookmark(0))) {
+    LOG_LAST_ERROR(EvtCreateBookmark);
+    return;
+  }
+
+  const auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+  if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
+  }
+
+  DWORD dwReturned{};
+  EVT_HANDLE hEvent{};
+  if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+    LOG_LAST_ERROR(EvtNext);
+    return;
+  }
+
+  ok_ = saveBookmark(hEvent);
 }
 
 Bookmark::~Bookmark() {
-  if (file_.is_open()) {
-    file_.close();
-  }
-
   if (hBookmark_) {
     EvtClose(hBookmark_);
   }
@@ -66,11 +73,18 @@
   return ok_;
 }
   
-bool Bookmark::hasBookmarkXml() const {
-  return hasBookmarkXml_;
-}
+EVT_HANDLE Bookmark::getBookmarkHandleFromXML() {
+  if (hBookmark_) {
+    EvtClose(hBookmark_);
+    hBookmark_ = 0;
+  }
 
-EVT_HANDLE Bookmark::bookmarkHandle() const {
+  hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str());
+  if (!(hBookmark_ = EvtCreateBookmark(bookmarkXml_.c_str()))) {
+    LOG_LAST_ERROR(EvtCreateBookmark);
+    return 0;
+  }
+
   return hBookmark_;
 }
 
@@ -88,7 +102,7 @@
 
 bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
   if (!EvtUpdateBookmark(hBookmark_, hEvent)) {
-    logger_->log_error("!EvtUpdateBookmark error: %d.", GetLastError());
+    LOG_LAST_ERROR(EvtUpdateBookmark);
     return false;
   }
 
@@ -104,7 +118,7 @@
       std::vector<wchar_t> buf(bufferSize / 2 + 1);
 
       if (!EvtRender(0, hBookmark_, EvtRenderBookmark, bufferSize, &buf[0], &bufferUsed, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
+        LOG_LAST_ERROR(EvtRender);
         return false;
       }
 
@@ -112,8 +126,8 @@
 
       return true;
     }
-    else if (ERROR_SUCCESS != (status = GetLastError())) {
-      logger_->log_error("!EvtRender error: %d.", GetLastError());
+    if (ERROR_SUCCESS != (status = GetLastError())) {
+      LOG_LAST_ERROR(EvtRender);
       return false;
     }
   }
@@ -121,7 +135,9 @@
   return false;
 }
 
-void Bookmark::saveBookmarkXml(std::wstring& bookmarkXml) {
+void Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
+  bookmarkXml_ = bookmarkXml;
+
   // Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
   file_.seekp(std::ios::beg);
 
@@ -130,7 +146,6 @@
   file_.flush();
 }
 
-
 bool Bookmark::createEmptyBookmarkXmlFile() {
   if (file_.is_open()) {
     file_.close();
@@ -205,7 +220,7 @@
   // '!' should be at the end of bookmark.
   auto pos = bookmarkXml.find(L'!');
   if (std::wstring::npos == pos) {
-    logger_->log_error("No '!' in bookmarXml '%s'", bookmarkXml.c_str());
+    logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str());
     bookmarkXml.clear();
     return createEmptyBookmarkXmlFile();
   }
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 4435734..f89360b 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -14,30 +14,22 @@
 namespace minifi {
 namespace processors {
 
+#define LOG_LAST_ERROR(func) logger_->log_error("!"#func" error %x", GetLastError())
+
 class Bookmark
 {
 public:
-  Bookmark(const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
+  Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger);
   ~Bookmark();
-
   operator bool() const;
-  
-  bool hasBookmarkXml() const;
-
-  EVT_HANDLE bookmarkHandle() const;
-
-  bool saveBookmark(EVT_HANDLE hEvent);
-
+  EVT_HANDLE getBookmarkHandleFromXML();
   bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
-
-  void saveBookmarkXml(std::wstring& bookmarkXml);
+  void saveBookmarkXml(const std::wstring& bookmarkXml);
 private:
+  bool saveBookmark(EVT_HANDLE hEvent);
   bool createEmptyBookmarkXmlFile();
-
   bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
-
   std::string filePath(const std::string& uuid);
-
   bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
 
 private:
@@ -46,7 +38,7 @@
   bool ok_{};
   EVT_HANDLE hBookmark_{};
   std::wfstream file_;
-  bool hasBookmarkXml_{};
+  std::wstring bookmarkXml_;
 };
 
 } /* namespace processors */
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index a6832a9..cbcfbab 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -29,6 +29,7 @@
 #include <iostream>
 #include <memory>
 #include <regex>
+#include <cinttypes>
 
 #include "wel/MetadataWalker.h"
 #include "wel/XMLString.h"
@@ -77,6 +78,7 @@
     " This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)")->
   build());
 
+// !!! This property is obsolete since now subscription is not used, but leave since it might be is used already in config.yml.
 core::Property ConsumeWindowsEventLog::InactiveDurationToReconnect(
   core::PropertyBuilder::createProperty("Inactive Duration To Reconnect")->
   isRequired(true)->
@@ -197,17 +199,6 @@
   context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
   context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
 
-  std::string bookmarkDir;
-  context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
-  if (bookmarkDir.empty()) {
-    logger_->log_error("State Directory is empty");
-  } else {
-    pBookmark_ = std::make_unique<Bookmark>(bookmarkDir, getUUIDStr(), logger_);
-    if (!*pBookmark_) {
-      pBookmark_.reset();
-    }
-  }
-
   std::string header;
   context->getProperty(EventHeader.getName(), header);
 
@@ -220,8 +211,7 @@
       if (!insertHeaderName(header_names_, key, value)) {
         logger_->log_error("%s is an invalid key for the header map", key);
       }
-    }
-    else if (splitKeyAndValue.size() == 1) {
+    } else if (splitKeyAndValue.size() == 1) {
      auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
      if (!insertHeaderName(header_names_, key, "")) {
        logger_->log_error("%s is an invalid key for the header map", key);
@@ -241,28 +231,45 @@
     if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
       hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
       if (!hMsobjsDll_) {
-        logger_->log_error("!LoadLibrary error %x", GetLastError());
+        LOG_LAST_ERROR(LoadLibrary);
       }
     } else {
-      logger_->log_error("!GetSystemDirectory error %x", GetLastError());
+      LOG_LAST_ERROR(GetSystemDirectory);
     }
   }
 
-  if (subscriptionHandle_) {
-    logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
-  } else {
-    sessionFactory_ = sessionFactory;
+  context->getProperty(Channel.getName(), channel_);
+  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());
 
-    subscribe(context);
-  }
-}
+  std::string query;
+  context->getProperty(Query.getName(), query);
+  wstrQuery_ = std::wstring(query.begin(), query.end());
 
-void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
-  if (!subscriptionHandle_) {
-    if (!subscribe(context)) {
-      context->yield();
+  if (!pBookmark_) {
+    std::string bookmarkDir;
+    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
+    if (bookmarkDir.empty()) {
+      logger_->log_error("State Directory is empty");
       return;
     }
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), logger_);
+    if (!*pBookmark_) {
+      pBookmark_.reset();
+      return;
+    }
+  }
+
+  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
+  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %" PRIu64, maxBufferSize_);
+
+  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
+}
+
+
+void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  if (!pBookmark_) {
+    context->yield();
+    return;
   }
 
   std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
@@ -271,18 +278,87 @@
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
-
-  const auto now = GetTickCount64();
-
-  if (flowFileCount > 0) {
-    lastActivityTimestamp_ = now;
-  }
-  else if (inactiveDurationToReconnect_ > 0) {
-    if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
-      logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
-      unsubscribe();
+  struct TimeDiff {
+    auto operator()() const {
+      return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
     }
+    const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRId64 " ms", timeDiff());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      logger_->log_debug("outgoingConnectionsFull");
+      return false;
+    }
+
+    return true;
+  };
+
+  size_t eventCount = 0;
+  const TimeDiff timeDiff;
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", eventCount, timeDiff());
+  });
+
+  size_t commitAndSaveBookmarkCount = 0;
+  std::wstring bookmarkXml;
+
+  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
+
+  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+  if (!hBookmark) {
+    // Unrecovarable error.
+    pBookmark_.reset();
+    return;
+  }
+
+  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
+  }
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LOG_LAST_ERROR(EvtNext);
+      }
+      break;
+    }
+    const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+    EventRender eventRender;
+    std::wstring newBookmarkXml;
+    if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+      bookmarkXml = std::move(newBookmarkXml);
+      eventCount++;
+      putEventRenderFlowFileToSession(eventRender, *session);
+
+      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+        if (!commitAndSaveBookmark(bookmarkXml)) {
+          return;
+        }
+
+        commitAndSaveBookmarkCount = eventCount;
+      }
+    }
+  }
+
+  if (eventCount > commitAndSaveBookmarkCount) {
+    commitAndSaveBookmark(bookmarkXml);
   }
 }
 
@@ -301,7 +377,6 @@
   return providers_[name];
 } 
 
-
 // !!! Used a non-documented approach to resolve `%%` in XML via C:\Windows\System32\MsObjs.dll.
 // Links which mention this approach: 
 // https://social.technet.microsoft.com/Forums/Windows/en-US/340632d1-60f0-4cc5-ad6f-f8c841107d0d/translate-value-1833quot-on-impersonationlevel-and-similar-values?forum=winservergen
@@ -362,7 +437,7 @@
             // Add "" to xmlPercentageItemsResolutions_ - don't need to call FormaMessage for this 'key' again.
             xmlPercentageItemsResolutions_.insert({key, ""});
 
-            logger_->log_error("!FormatMessage error: %d. '%s' is not found in msobjs.dll.", GetLastError(), key.c_str());
+            logger_->log_error("!FormatMessage error: %x. '%s' is not found in msobjs.dll.", GetLastError(), key.c_str());
           }
         } else {
           value = it->second;
@@ -391,322 +466,122 @@
   doc.traverse(treeWalker);
 }
 
-void ConsumeWindowsEventLog::processEvent(EVT_HANDLE hEvent) {
+bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
   DWORD size = 0;
   DWORD used = 0;
   DWORD propertyCount = 0;
-  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
-    if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
-      if (used > maxBufferSize_) {
-        logger_->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", hEvent, maxBufferSize_);
-        return;
-      }
-
-      size = used;
-      std::vector<wchar_t> buf(size / 2 + 1);
-      if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
-        logger_->log_error("!EvtRender error: %d.", GetLastError());
-        return;
-      }
-
-      std::string xml = wel::to_string(&buf[0]);
-
-      pugi::xml_document doc;
-      pugi::xml_parse_result result = doc.load_string(xml.c_str());
-
-      if (!result) {
-        logger_->log_error("Invalid XML produced");
-        return;
-      }
-      // this is a well known path. 
-      std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-      wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
-
-      // resolve the event metadata
-      doc.traverse(walker);
-
-      EventRender renderedData;
-
-      if (writePlainText_) {
-        auto handler = getEventLogHandler(providerName);
-        auto message = handler.getEventMessage(hEvent);
-
-        if (!message.empty()) {
-
-          for (const auto &mapEntry : walker.getIdentifiers()) {
-            // replace the identifiers with their translated strings.
-            utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
-          }
-          wel::WindowsEventLogHeader log_header(header_names_);
-          // set the delimiter
-          log_header.setDelimiter(header_delimiter_);
-          // render the header.
-          renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-          renderedData.rendered_text_ += "Message" + header_delimiter_ + " ";
-          renderedData.rendered_text_ += message;
-        }
-      }
-
-      if (writeXML_) {
-        substituteXMLPercentageItems(doc);
-
-        if (resolve_as_attributes_) {
-          renderedData.matched_fields_ = walker.getFieldValues();
-        }
-
-        wel::XmlString writer;
-        doc.print(writer, "", pugi::format_raw); // no indentation or formatting
-        xml = writer.xml_;
-
-        renderedData.text_ = std::move(xml);
-      }
-
-      if (pBookmark_) {
-        std::wstring bookmarkXml;
-        if (pBookmark_->getNewBookmarkXml(hEvent, bookmarkXml)) {
-          renderedData.bookmarkXml_ = bookmarkXml;
-        }
-      }
-
-      listRenderedData_.enqueue(std::move(renderedData));
-    }
-  }
-}
-
-bool ConsumeWindowsEventLog::processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query) {
-  if (!EvtSeek(hEventResults, 1, pBookmark_->bookmarkHandle(), 0, EvtSeekRelativeToBookmark)) {
-    logger_->log_error("!EvtSeek error %d.", GetLastError());
+  EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount);
+  if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
+    LOG_LAST_ERROR(EvtRender);
     return false;
   }
 
-  // Enumerate the events in the result set after the bookmarked event.
-  while (true) {
-    EVT_HANDLE hEvent{};
-    DWORD dwReturned{};
-    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-      DWORD status = ERROR_SUCCESS;
-      if (ERROR_NO_MORE_ITEMS != (status = GetLastError())) {
-        logger_->log_error("!EvtNext error %d.", status);
+  if (used > maxBufferSize_) {
+    logger_->log_error("Dropping event because it couldn't be rendered within %" PRIu64 " bytes.", maxBufferSize_);
+    return false;
+  }
+
+  size = used;
+  std::vector<wchar_t> buf(size / 2 + 1);
+  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
+    LOG_LAST_ERROR(EvtRender);
+    return false;
+  }
+
+  std::string xml = wel::to_string(&buf[0]);
+
+  pugi::xml_document doc;
+  pugi::xml_parse_result result = doc.load_string(xml.c_str());
+
+  if (!result) {
+    logger_->log_error("Invalid XML produced");
+    return false;
+  }
+
+  // this is a well known path. 
+  std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
+  wel::MetadataWalker walker(getEventLogHandler(providerName).getMetadata(), channel_, hEvent, !resolve_as_attributes_, apply_identifier_function_, regex_);
+
+  // resolve the event metadata
+  doc.traverse(walker);
+
+  if (writePlainText_) {
+    auto handler = getEventLogHandler(providerName);
+    auto message = handler.getEventMessage(hEvent);
+
+    if (!message.empty()) {
+
+      for (const auto &mapEntry : walker.getIdentifiers()) {
+        // replace the identifiers with their translated strings.
+        utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
       }
-      break;
+      wel::WindowsEventLogHeader log_header(header_names_);
+      // set the delimiter
+      log_header.setDelimiter(header_delimiter_);
+      // render the header.
+      eventRender.rendered_text_ = log_header.getEventHeader(&walker);
+      eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
+      eventRender.rendered_text_ += message;
+    }
+  }
+
+  if (writeXML_) {
+    substituteXMLPercentageItems(doc);
+
+    if (resolve_as_attributes_) {
+      eventRender.matched_fields_ = walker.getFieldValues();
     }
 
-    processEvent(hEvent);
+    wel::XmlString writer;
+    doc.print(writer, "", pugi::format_raw); // no indentation or formatting
+    xml = writer.xml_;
 
-    EvtClose(hEvent);
+    eventRender.text_ = std::move(xml);
   }
 
   return true;
 }
 
-
-bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
-  context->getProperty(Channel.getName(), channel_);
-  context->getProperty(Query.getName(), query_);
-
-  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
-  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
-
-  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query_;
-
-  std::string strInactiveDurationToReconnect;
-  context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
-
-  // Get 'inactiveDurationToReconnect_'.
-  core::TimeUnit unit;
-  if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_, unit) &&
-    core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_)) {
-    logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
-  }
-
-  if (!pBookmark_) {
-    logger_->log_error("!pBookmark_");
-    return false;
-  }
-
-  auto channel = std::wstring(channel_.begin(), channel_.end());
-  auto query = std::wstring(query_.begin(), query_.end());
-
-  do {
-    auto hEventResults = EvtQuery(0, channel.c_str(), query.c_str(), EvtQueryChannelPath);
-    if (!hEventResults) {
-      logger_->log_error("!EvtQuery error: %d.", GetLastError());
-      // Consider it as a serious error.
-      return false;
-    }
-    const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });
-
-    if (pBookmark_->hasBookmarkXml()) {
-      if (!processEventsAfterBookmark(hEventResults, channel, query)) {
-        break;
-      }
-    } else {
-      // Seek to the last event in the hEventResults.
-      if (!EvtSeek(hEventResults, 0, 0, 0, EvtSeekRelativeToLast)) {
-        logger_->log_error("!EvtSeek error: %d.", GetLastError());
-        break;
-      }
-
-      DWORD dwReturned{};
-      EVT_HANDLE hEvent{};
-      if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
-        logger_->log_error("!EvtNext error: %d.", GetLastError());
-        break;
-      }
-
-      pBookmark_->saveBookmark(hEvent);
-    }
-  } while (false);
-
-  subscriptionHandle_ = EvtSubscribe(
-      NULL,
-      NULL,
-      channel.c_str(),
-      query.c_str(),
-      NULL,
-      this,
-      [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent)
-      {
-        auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
-
-        auto& logger = pConsumeWindowsEventLog->logger_;
-
-        if (action == EvtSubscribeActionError) {
-          if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)hEvent) {
-            logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
-          } else {
-            logger->log_error("Received the following Win32 error: %x", hEvent);
-          }
-        } else if (action == EvtSubscribeActionDeliver) {
-          pConsumeWindowsEventLog->processEvent(hEvent);
-        }
-
-        return 0UL;
-      },
-      EvtSubscribeToFutureEvents | EvtSubscribeStrict);
-
-  if (!subscriptionHandle_) {
-    logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
-    return false;
-  }
-
-  lastActivityTimestamp_ = GetTickCount64();
-
-  return true;
-}
-
-void ConsumeWindowsEventLog::unsubscribe()
+void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session)
 {
-  if (subscriptionHandle_) {
-    EvtClose(subscriptionHandle_);
-    subscriptionHandle_ = 0;
-  }
-}
-
-int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession> &session)
-{
-  struct WriteCallback: public OutputStreamCallback {
+  struct WriteCallback : public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : data_(str.c_str()), size_(str.length()) {
+      : str_(str) {
     }
 
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      return stream->writeData((uint8_t*)data_, size_);
+      return stream->writeData(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), str_.size());
     }
 
-    std::string str_;
-    const char * data_;
-    const size_t size_;
+    const std::string& str_;
   };
 
-  int flowFileCount = 0;
+  if (writeXML_) {
+    auto flowFile = session.create();
 
-  auto before_time = std::chrono::high_resolution_clock::now();
-  utils::ScopeGuard timeGuard([&](){
-    logger_->log_debug("processQueue processed %d Events in %llu ms",
-                      flowFileCount,
-                      std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
-  });
-
-  bool commitAndSaveBookmark = false;
-
-  EventRender evt;
-  while (listRenderedData_.try_dequeue(evt)) {
-    commitAndSaveBookmark = true;
-
-    if (writeXML_) {
-      auto flowFile = session->create();
-
-      session->write(flowFile, &WriteCallback(evt.text_));
-      for (const auto &fieldMapping : evt.matched_fields_) {
-        if (!fieldMapping.second.empty()) {
-          session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
-        }
+    session.write(flowFile, &WriteCallback(eventRender.text_));
+    for (const auto &fieldMapping : eventRender.matched_fields_) {
+      if (!fieldMapping.second.empty()) {
+        session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
       }
-      session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
-      session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
-                                                0);
-      session->transfer(flowFile, Success);
     }
-
-    if (writePlainText_) {
-      auto flowFile = session->create();
-
-      session->write(flowFile, &WriteCallback(evt.rendered_text_));
-      session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
-      session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
-                                                0);
-      session->transfer(flowFile, Success);
-    }
-
-    flowFileCount++;
-
-    if (batch_commit_size_ != 0U && (flowFileCount % batch_commit_size_ == 0)) {
-      auto before_commit = std::chrono::high_resolution_clock::now();
-      session->commit();
-      logger_->log_debug("processQueue commit took %llu ms",
-                        std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
-
-      if (pBookmark_) {
-        pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
-      }
-      if(session->outgoingConnectionsFull("success")) {
-        return flowFileCount;  // Enough flowfiles there
-      }
-
-      commitAndSaveBookmark = false;
-    }
+    session.putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
+    session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+    session.transfer(flowFile, Success);
   }
 
-  if (commitAndSaveBookmark) {
-    session->commit();
+  if (writePlainText_) {
+    auto flowFile = session.create();
 
-    if (pBookmark_) {
-      pBookmark_->saveBookmarkXml(evt.bookmarkXml_);
-    }
+    session.write(flowFile, &WriteCallback(eventRender.rendered_text_));
+    session.putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
+    session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
+    session.transfer(flowFile, Success);
   }
-
-  return flowFileCount;
 }
 
 void ConsumeWindowsEventLog::notifyStop()
 {
-  unsubscribe();
-
-  if (listRenderedData_.size_approx() != 0) {
-    auto session = sessionFactory_->createSession();
-    if (session) {
-      logger_->log_info("Finishing processing leftover events");
-
-      processQueue(session);
-    } else {
-      logger_->log_error(
-        "Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. "
-        "Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, "
-        "receiving events and stopping before the onTrigger happens. The messages in the internal queue cannot finish processing until "
-        "the processor is triggered to run.");
-    }
-  }
 }
 
 void ConsumeWindowsEventLog::LogWindowsError()
@@ -723,7 +598,7 @@
     (LPTSTR)&lpMsg,
     0, NULL);
 
-  logger_->log_error("Error %d: %s\n", (int)error_id, (char *)lpMsg);
+  logger_->log_error("Error %x: %s\n", (int)error_id, (char *)lpMsg);
 
   LocalFree(lpMsg);
 }
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 2314611..4085b76 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -46,7 +46,6 @@
 	std::map<std::string, std::string> matched_fields_;
 	std::string text_;
 	std::string rendered_text_;
-  std::wstring bookmarkXml_;
 };
 
 class Bookmark;
@@ -101,14 +100,11 @@
   
 
 protected:
-  bool subscribe(const std::shared_ptr<core::ProcessContext> &context);
-  void unsubscribe();
-  int processQueue(const std::shared_ptr<core::ProcessSession> &session);
+  void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session);
   wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
   void LogWindowsError();
-  void processEvent(EVT_HANDLE eventHandle);
-  bool processEventsAfterBookmark(EVT_HANDLE hEventResults, const std::wstring& channel, const std::wstring& query);
+  bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
   void substituteXMLPercentageItems(pugi::xml_document& doc);
 
   static constexpr const char * const XML = "XML";
@@ -120,19 +116,16 @@
   wel::METADATA_NAMES header_names_;
   std::string header_delimiter_;
   std::string channel_;
-  std::string query_;
+  std::wstring wstrChannel_;
+  std::wstring wstrQuery_;
   std::shared_ptr<logging::Logger> logger_;
   std::string regex_;
   bool resolve_as_attributes_;
   bool apply_identifier_function_;
-  moodycamel::ConcurrentQueue<EventRender> listRenderedData_;
   std::string provenanceUri_;
   std::string computerName_;
-  int64_t inactiveDurationToReconnect_{};
-  EVT_HANDLE subscriptionHandle_{};
   uint64_t maxBufferSize_{};
   DWORD lastActivityTimestamp_{};
-  std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
   std::mutex cache_mutex_;
   std::map<std::string, wel::WindowsEventLogHandler > providers_;
   uint64_t batch_commit_size_;