/**
 * @file ConsumeWindowsEventLog.cpp
 * ConsumeWindowsEventLog class declaration
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "ConsumeWindowsEventLog.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>
#include <memory>
#include <regex>
#include <cinttypes>

#include "wel/MetadataWalker.h"
#include "wel/XMLString.h"
#include "wel/UnicodeConversion.h"

#include "io/BufferStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Bookmark.h"
#include "utils/Deleters.h"

#include "utils/gsl.h"

#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "ole32.lib")

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

// ConsumeWindowsEventLog
const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
const int EVT_NEXT_TIMEOUT_MS = 500;

core::Property ConsumeWindowsEventLog::Channel(
  core::PropertyBuilder::createProperty("Channel")->
  isRequired(true)->
  withDefaultValue("System")->
  withDescription("The Windows Event Log Channel to listen to.")->
  supportsExpressionLanguage(true)->
  build());

core::Property ConsumeWindowsEventLog::Query(
  core::PropertyBuilder::createProperty("Query")->
  isRequired(true)->
  withDefaultValue("*")->
  withDescription("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")->
  supportsExpressionLanguage(true)->
  build());

core::Property ConsumeWindowsEventLog::MaxBufferSize(
  core::PropertyBuilder::createProperty("Max Buffer Size")->
  isRequired(true)->
  withDefaultValue<core::DataSizeValue>("1 MB")->
  withDescription(
    "The individual Event Log XMLs are rendered to a buffer."
    " This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)")->
  build());

// !!! This property is obsolete since now subscription is not used, but leave since it might be is used already in config.yml.
core::Property ConsumeWindowsEventLog::InactiveDurationToReconnect(
  core::PropertyBuilder::createProperty("Inactive Duration To Reconnect")->
  isRequired(true)->
  withDefaultValue<core::TimePeriodValue>("10 min")->
  withDescription(
    "If no new event logs are processed for the specified time period, "
    " this processor will try reconnecting to recover from a state where any further messages cannot be consumed."
    " Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned."
    " Setting no duration, e.g. '0 ms' disables auto-reconnection.")->
  build());

core::Property ConsumeWindowsEventLog::IdentifierMatcher(
  core::PropertyBuilder::createProperty("Identifier Match Regex")->
  isRequired(false)->
  withDefaultValue(".*Sid")->
  withDescription("Regular Expression to match Subject Identifier Fields. These will be placed into the attributes of the FlowFile")->
  build());


core::Property ConsumeWindowsEventLog::IdentifierFunction(
  core::PropertyBuilder::createProperty("Apply Identifier Function")->
  isRequired(false)->
  withDefaultValue<bool>(true)->
  withDescription("If true it will resolve SIDs matched in the 'Identifier Match Regex' to the DOMAIN\\USERNAME associated with that ID")->
  build());

core::Property ConsumeWindowsEventLog::ResolveAsAttributes(
  core::PropertyBuilder::createProperty("Resolve Metadata in Attributes")->
  isRequired(false)->
  withDefaultValue<bool>(true)->
  withDescription("If true, any metadata that is resolved ( such as IDs or keyword metadata ) will be placed into attributes, otherwise it will be replaced in the XML or text output")->
  build());


core::Property ConsumeWindowsEventLog::EventHeaderDelimiter(
  core::PropertyBuilder::createProperty("Event Header Delimiter")->
  isRequired(false)->
  withDescription("If set, the chosen delimiter will be used in the Event output header. Otherwise, a colon followed by spaces will be used.")->
  build());


core::Property ConsumeWindowsEventLog::EventHeader(
  core::PropertyBuilder::createProperty("Event Header")->
  isRequired(false)->
  withDefaultValue("LOG_NAME=Log Name, SOURCE = Source, TIME_CREATED = Date,EVENT_RECORDID=Record ID,EVENTID = Event ID,TASK_CATEGORY = Task Category,LEVEL = Level,KEYWORDS = Keywords,USER = User,COMPUTER = Computer, EVENT_TYPE = EventType")->
  withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID,EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.")->
  build());

core::Property ConsumeWindowsEventLog::OutputFormat(
  core::PropertyBuilder::createProperty("Output Format")->
  isRequired(true)->
  withDefaultValue(Both)->
  withAllowableValues<std::string>({XML, Plaintext, Both})->
  withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
  build());

core::Property ConsumeWindowsEventLog::BatchCommitSize(
  core::PropertyBuilder::createProperty("Batch Commit Size")->
  isRequired(false)->
  withDefaultValue<uint64_t>(1000U)->
  withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")->
  build());

core::Property ConsumeWindowsEventLog::BookmarkRootDirectory(
  core::PropertyBuilder::createProperty("State Directory")->
  isRequired(false)->
  withDefaultValue("CWELState")->
  withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->
  build());

core::Property ConsumeWindowsEventLog::ProcessOldEvents(
  core::PropertyBuilder::createProperty("Process Old Events")->
  isRequired(true)->
  withDefaultValue<bool>(false)->
  withDescription("This property defines if old events (which are created before first time server is started) should be processed.")->
  build());

core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");

ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
  : core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
  char buff[MAX_COMPUTERNAME_LENGTH + 1];
  DWORD size = sizeof(buff);
  if (GetComputerName(buff, &size)) {
    computerName_ = buff;
  } else {
    LogWindowsError();
  }

  writeXML_ = false;
  writePlainText_ = false;
}

void ConsumeWindowsEventLog::notifyStop() {
  std::lock_guard<std::mutex> lock(on_trigger_mutex_);
  logger_->log_trace("start notifyStop");
  bookmark_.reset();
  if (hMsobjsDll_) {
    if (FreeLibrary(hMsobjsDll_)) {
      hMsobjsDll_ = nullptr;
    } else {
      LOG_LAST_ERROR(LoadLibrary);
    }
  }
  logger_->log_trace("finish notifyStop");
}

ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
  if (hMsobjsDll_) {
    FreeLibrary(hMsobjsDll_);
  }
}

void ConsumeWindowsEventLog::initialize() {
  //! Set the supported properties
  setSupportedProperties({
     Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes,
     EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize, BookmarkRootDirectory, ProcessOldEvents
  });

  //! Set the supported relationships
  setSupportedRelationships({Success});
}

bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string & value) const {

  wel::METADATA name = wel::WindowsEventLogMetadata::getMetadataFromString(key);

  if (name != wel::METADATA::UNKNOWN) {
    header.emplace_back(std::make_pair(name, value));
    return true;
  }
  return false;
}

void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  state_manager_ = context->getStateManager();
  if (state_manager_ == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
  }

  context->getProperty(IdentifierMatcher.getName(), regex_);
  context->getProperty(ResolveAsAttributes.getName(), resolve_as_attributes_);
  context->getProperty(IdentifierFunction.getName(), apply_identifier_function_);
  context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
  context->getProperty(BatchCommitSize.getName(), batch_commit_size_);

  std::string header;
  context->getProperty(EventHeader.getName(), header);

  auto keyValueSplit = utils::StringUtils::split(header, ",");
  for (const auto &kv : keyValueSplit) {
    auto splitKeyAndValue = utils::StringUtils::split(kv, "=");
    if (splitKeyAndValue.size() == 2) {
      auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
      auto value = utils::StringUtils::trim(splitKeyAndValue.at(1));
      if (!insertHeaderName(header_names_, key, value)) {
        logger_->log_error("%s is an invalid key for the header map", key);
      }
    } else if (splitKeyAndValue.size() == 1) {
     auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
     if (!insertHeaderName(header_names_, key, "")) {
       logger_->log_error("%s is an invalid key for the header map", key);
     }
    }
  }

  std::string mode;
  context->getProperty(OutputFormat.getName(), mode);

  writeXML_ = (mode == Both || mode == XML);

  writePlainText_ = (mode == Both || mode == Plaintext);

  if (writeXML_ && !hMsobjsDll_) {
    char systemDir[MAX_PATH];
    if (GetSystemDirectory(systemDir, sizeof(systemDir))) {
      hMsobjsDll_ = LoadLibrary((systemDir + std::string("\\msobjs.dll")).c_str());
      if (!hMsobjsDll_) {
        LOG_LAST_ERROR(LoadLibrary);
      }
    } else {
      LOG_LAST_ERROR(GetSystemDirectory);
    }
  }

  context->getProperty(Channel.getName(), channel_);
  wstrChannel_ = std::wstring(channel_.begin(), channel_.end());

  std::string query;
  context->getProperty(Query.getName(), query);
  wstrQuery_ = std::wstring(query.begin(), query.end());

  bool processOldEvents{};
  context->getProperty(ProcessOldEvents.getName(), processOldEvents);

  if (!bookmark_) {
    std::string bookmarkDir;
    context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
    if (bookmarkDir.empty()) {
      logger_->log_error("State Directory is empty");
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "State Directory is empty");
    }
    bookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUID(), processOldEvents, state_manager_, logger_);
    if (!*bookmark_) {
      bookmark_.reset();
      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bookmark is empty");
    }
  }

  context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
  logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %" PRIu64, maxBufferSize_);

  provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
  logger_->log_trace("Successfully configured CWEL");
}

bool ConsumeWindowsEventLog::commitAndSaveBookmark(const std::wstring &bookmark_xml, const std::shared_ptr<core::ProcessSession> &session) {
  {
    const TimeDiff time_diff;
    session->commit();
    logger_->log_debug("processQueue commit took %" PRId64 " ms", time_diff());
  }

  if (!bookmark_->saveBookmarkXml(bookmark_xml)) {
    logger_->log_error("Failed to save bookmark xml");
  }

  if (session->outgoingConnectionsFull("success")) {
    logger_->log_debug("Outgoing success connection is full");
    return false;
  }

  return true;
}

std::tuple<size_t, std::wstring> ConsumeWindowsEventLog::processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
    const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results) {
  size_t processed_event_count = 0;
  std::wstring bookmark_xml;
  logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
  while (processed_event_count < batch_commit_size_ || batch_commit_size_ == 0) {
    EVT_HANDLE next_event{};
    DWORD handles_set_count{};
    if (!EvtNext(event_query_results, 1, &next_event, EVT_NEXT_TIMEOUT_MS, 0, &handles_set_count)) {
      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
        LogWindowsError("Failed to get next event");
        continue;
        /* According to MS this iteration should only end when the return value is false AND
          the error code is NO_MORE_ITEMS. See the following page for further details:
          https://docs.microsoft.com/en-us/windows/win32/api/winevt/nf-winevt-evtnext */
      }
      break;
    }

    const auto guard_next_event = gsl::finally([next_event]() { EvtClose(next_event); });
    logger_->log_trace("Succesfully got the next event, performing event rendering");
    EventRender event_render;
    std::wstring new_bookmark_xml;
    if (createEventRender(next_event, event_render) && bookmark_->getNewBookmarkXml(next_event, new_bookmark_xml)) {
      bookmark_xml = std::move(new_bookmark_xml);
      processed_event_count++;
      putEventRenderFlowFileToSession(event_render, *session);
    }
  }
  logger_->log_trace("Finished enumerating events.");
  return std::make_tuple(processed_event_count, bookmark_xml);
}

void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  if (!bookmark_) {
    logger_->log_debug("bookmark_ is null");
    context->yield();
    return;
  }

  std::unique_lock<std::mutex> lock(on_trigger_mutex_, std::try_to_lock);
  if (!lock.owns_lock()) {
    logger_->log_warn("processor was triggered before previous listing finished, configuration should be revised!");
    return;
  }

  logger_->log_trace("CWEL onTrigger");

  size_t processed_event_count = 0;
  const TimeDiff time_diff;
  const auto timeGuard = gsl::finally([&]() {
    logger_->log_debug("processed %zu Events in %"  PRId64 " ms", processed_event_count, time_diff());
  });

  const auto event_query_results = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
  if (!event_query_results) {
    LOG_LAST_ERROR(EvtQuery);
    context->yield();
    return;
  }
  const auto guard_event_query_results = gsl::finally([event_query_results]() { EvtClose(event_query_results); });

  logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());

  auto bookmark_handle = bookmark_->getBookmarkHandleFromXML();
  if (!bookmark_handle) {
    logger_->log_error("bookmark_handle is null, unrecoverable error!");
    bookmark_.reset();
    context->yield();
    return;
  }

  if (!EvtSeek(event_query_results, 1, bookmark_handle, 0, EvtSeekRelativeToBookmark)) {
    LOG_LAST_ERROR(EvtSeek);
    context->yield();
    return;
  }

  refreshTimeZoneData();

  std::wstring bookmark_xml;
  std::tie(processed_event_count, bookmark_xml) = processEventLogs(context, session, event_query_results);

  if (processed_event_count == 0 || !commitAndSaveBookmark(bookmark_xml, session)) {
    context->yield();
    return;
  }
}

wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const std::string & name) {
  std::lock_guard<std::mutex> lock(cache_mutex_);
  logger_->log_trace("Getting Event Log Handler corresponding to %s", name.c_str());
  auto provider = providers_.find(name);
  if (provider != std::end(providers_)) {
    logger_->log_trace("Found the handler");
    return provider->second;
  }

  std::wstring temp_wstring = std::wstring(name.begin(), name.end());
  LPCWSTR widechar = temp_wstring.c_str();

  providers_[name] = wel::WindowsEventLogHandler(EvtOpenPublisherMetadata(NULL, widechar, NULL, 0, 0));
  logger_->log_trace("Not found the handler -> created handler for %s", name.c_str());
  return providers_[name];
}

// !!! Used a non-documented approach to resolve `%%` in XML via C:\Windows\System32\MsObjs.dll.
// Links which mention this approach:
// https://social.technet.microsoft.com/Forums/Windows/en-US/340632d1-60f0-4cc5-ad6f-f8c841107d0d/translate-value-1833quot-on-impersonationlevel-and-similar-values?forum=winservergen
// https://github.com/libyal/libevtx/blob/master/documentation/Windows%20XML%20Event%20Log%20(EVTX).asciidoc
// https://stackoverflow.com/questions/33498244/marshaling-a-message-table-resource
//
// Traverse xml and check each node, if it starts with '%%' and contains only digits, use it as key to lookup value in C:\Windows\System32\MsObjs.dll.
void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& doc) {
  if (!hMsobjsDll_) {
    return;
  }

  struct TreeWalker: public pugi::xml_tree_walker {
    TreeWalker(HMODULE hMsobjsDll, std::unordered_map<std::string, std::string>& xmlPercentageItemsResolutions, std::shared_ptr<logging::Logger> logger)
      : hMsobjsDll_(hMsobjsDll), xmlPercentageItemsResolutions_(xmlPercentageItemsResolutions), logger_(logger) {
    }

    bool for_each(pugi::xml_node& node) override {
      static const std::string percentages = "%%";

      bool percentagesReplaced = false;

      std::string nodeText = node.text().get();

      for (size_t numberPos = 0; std::string::npos != (numberPos = nodeText.find(percentages, numberPos));) {
        numberPos += percentages.size();

        auto number = 0u;
        try {
          // Assumption - first character is not '0', otherwise not all digits will be replaced by 'value'.
          number = std::stoul(&nodeText[numberPos]);
        } catch (const std::invalid_argument &) {
          continue;
        }

        const std::string key = std::to_string(number);

        std::string value;
        const auto it = xmlPercentageItemsResolutions_.find(key);
        if (it == xmlPercentageItemsResolutions_.end()) {
          LPTSTR pBuffer{};
          if (FormatMessage(
            FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_IGNORE_INSERTS,
            hMsobjsDll_,
            number,
            MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
            (LPTSTR)&pBuffer,
            1024,
            0
          )) {
            value = pBuffer;
            LocalFree(pBuffer);

            value = utils::StringUtils::trimRight(value);

            xmlPercentageItemsResolutions_.insert({key, value});
          } else {
            // Add "" to xmlPercentageItemsResolutions_ - don't need to call FormaMessage for this 'key' again.
            xmlPercentageItemsResolutions_.insert({key, ""});

            logger_->log_error("!FormatMessage error: %x. '%s' is not found in msobjs.dll.", GetLastError(), key.c_str());
          }
        } else {
          value = it->second;
        }

        if (!value.empty()) {
          nodeText.replace(numberPos - percentages.size(), key.size() + percentages.size(), value);

          percentagesReplaced = true;
        }
      }

      if (percentagesReplaced) {
        node.text().set(nodeText.c_str());
      }

      return true;
    }

   private:
    HMODULE hMsobjsDll_;
    std::unordered_map<std::string, std::string>& xmlPercentageItemsResolutions_;
    std::shared_ptr<logging::Logger> logger_;
  } treeWalker(hMsobjsDll_, xmlPercentageItemsResolutions_, logger_);

  doc.traverse(treeWalker);
}

bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
  logger_->log_trace("Rendering an event");
  WCHAR stackBuffer[4096];
  DWORD size = sizeof(stackBuffer);
  using Deleter = utils::StackAwareDeleter<WCHAR, utils::FreeDeleter>;
  std::unique_ptr<WCHAR, Deleter> buf{stackBuffer, Deleter{ stackBuffer }};

  DWORD used = 0;
  DWORD propertyCount = 0;
  if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, buf.get(), &used, &propertyCount)) {
    if (ERROR_INSUFFICIENT_BUFFER != GetLastError()) {
      LOG_LAST_ERROR(EvtRender);
      return false;
    }
    if (used > maxBufferSize_) {
      logger_->log_error("Dropping event because it couldn't be rendered within %" PRIu64 " bytes.", maxBufferSize_);
      return false;
    }
    size = used;
    buf.reset((LPWSTR)malloc(size));
    if (!buf) {
      return false;
    }
    if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, buf.get(), &used, &propertyCount)) {
      LOG_LAST_ERROR(EvtRender);
      return false;
    }
  }

  logger_->log_debug("Event rendered with size %" PRIu32 ". Performing doc traversing...", used);

  std::string xml = wel::to_string(buf.get());

  pugi::xml_document doc;
  pugi::xml_parse_result result = doc.load_string(xml.c_str());

  if (!result) {
    logger_->log_error("Invalid XML produced");
    return false;
  }

  // this is a well known path.
  std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
  wel::WindowsEventLogMetadataImpl metadata{getEventLogHandler(providerName).getMetadata(), hEvent};
  wel::MetadataWalker walker{metadata, channel_, !resolve_as_attributes_, apply_identifier_function_, regex_};

  // resolve the event metadata
  doc.traverse(walker);

  logger_->log_debug("Finish doc traversing, performing writing...");

  if (writePlainText_) {
    logger_->log_trace("Writing event in plain text");

    auto handler = getEventLogHandler(providerName);
    auto message = handler.getEventMessage(hEvent);

    if (!message.empty()) {

      for (const auto &mapEntry : walker.getIdentifiers()) {
        // replace the identifiers with their translated strings.
        if (mapEntry.first.empty() || mapEntry.second.empty()) {
          continue;  // This is most probably a result of a failed ID resolution
        }
        utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
      }
      wel::WindowsEventLogHeader log_header(header_names_);
      // set the delimiter
      log_header.setDelimiter(header_delimiter_);
      // render the header.
      eventRender.rendered_text_ = log_header.getEventHeader([&walker](wel::METADATA metadata) { return walker.getMetadata(metadata); });
      eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
      eventRender.rendered_text_ += message;
    }
    logger_->log_trace("Finish writing in plain text");
  }

  if (writeXML_) {
    logger_->log_trace("Writing event in XML");
    substituteXMLPercentageItems(doc);
    logger_->log_trace("Finish substituting %% in XML");

    if (resolve_as_attributes_) {
      eventRender.matched_fields_ = walker.getFieldValues();
    }

    wel::XmlString writer;
    doc.print(writer, "", pugi::format_raw);  // no indentation or formatting
    xml = writer.xml_;

    eventRender.text_ = std::move(xml);
    logger_->log_trace("Finish writing in XML");
  }

  return true;
}

void ConsumeWindowsEventLog::refreshTimeZoneData() {
  DYNAMIC_TIME_ZONE_INFORMATION tzinfo;
  auto ret = GetDynamicTimeZoneInformation(&tzinfo);
  std::wstring tzstr;
  long tzbias = 0;
  bool dst = false;
  switch (ret) {
    case TIME_ZONE_ID_INVALID:
      logger_->log_error("Failed to get timezone information!");
      return;  // Don't update members in case we cannot get data
    case TIME_ZONE_ID_UNKNOWN:
      tzstr = tzinfo.StandardName;
      tzbias = tzinfo.Bias;
      break;
    case TIME_ZONE_ID_DAYLIGHT:
      tzstr = tzinfo.DaylightName;
      dst = true;
      // [[fallthrough]];
    case TIME_ZONE_ID_STANDARD:
      tzstr = tzstr.empty() ? tzinfo.StandardName : tzstr;  // Use standard timezome name in case there is no daylight name or in case it's not DST
      tzbias = tzinfo.Bias + (dst ? tzinfo.DaylightBias : tzinfo.StandardBias);
      break;
  }

  tzbias *= -1;  // WinApi specifies UTC = localtime + bias, but we need offset from UTC
  std::stringstream tzoffset;
  tzoffset << (tzbias >= 0 ? '+' : '-') << std::setfill('0') << std::setw(2) << std::abs(tzbias) / 60
      << ":" << std::setfill('0') << std::setw(2) << std::abs(tzbias) % 60;

  timezone_name_ = wel::to_string(tzstr.c_str());
  timezone_offset_ = tzoffset.str();

  logger_->log_trace("Timezone name: %s, offset: %s", timezone_name_, timezone_offset_);
}

void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session) const {
  struct WriteCallback : public OutputStreamCallback {
    WriteCallback(const std::string& str)
      : str_(str) {
    }

    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
      return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), gsl::narrow<int>(str_.size()));
    }

    const std::string& str_;
  };

  if (writeXML_) {
    auto flowFile = session.create();
    logger_->log_trace("Writing rendered XML to a flow file");

    {
      WriteCallback wc{ eventRender.text_ };
      session.write(flowFile, &wc);
    }
    for (const auto &fieldMapping : eventRender.matched_fields_) {
      if (!fieldMapping.second.empty()) {
        session.putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
      }
    }
    session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
    session.putAttribute(flowFile, "Timezone name", timezone_name_);
    session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
    session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
    session.transfer(flowFile, Success);
  }

  if (writePlainText_) {
    auto flowFile = session.create();
    logger_->log_trace("Writing rendered plain text to a flow file");

    {
      WriteCallback wc{ eventRender.rendered_text_ };
      session.write(flowFile, &wc);
    }
    session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "text/plain");
    session.putAttribute(flowFile, "Timezone name", timezone_name_);
    session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
    session.getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
    session.transfer(flowFile, Success);
  }
}

void ConsumeWindowsEventLog::LogWindowsError(std::string error) const {
  auto error_id = GetLastError();
  LPVOID lpMsg;

  FormatMessage(
    FORMAT_MESSAGE_ALLOCATE_BUFFER |
    FORMAT_MESSAGE_FROM_SYSTEM,
    NULL,
    error_id,
    MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
    (LPTSTR)&lpMsg,
    0, NULL);

  logger_->log_error((error + " %x: %s\n").c_str(), (int)error_id, (char *)lpMsg);

  LocalFree(lpMsg);
}

}  // namespace processors
}  // namespace minifi
}  // namespace nifi
}  // namespace apache
}  // namespace org
