MINIFICPP-1058 - ConsumeWindowsEventLog should have a property that enables selection of output format

Signed-off-by: Daniel Bakai <bakaid@apache.org>
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 54856db..75971fa 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -123,6 +123,14 @@
   withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID,EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.")->
   build());
 
+core::Property ConsumeWindowsEventLog::OutputFormat(
+  core::PropertyBuilder::createProperty("Output Format")->
+  isRequired(true)->
+  withDefaultValue(Both)->
+  withAllowableValues<std::string>({XML, Plaintext, Both})->
+  withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
+  build());
+
 core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
 
 ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
@@ -135,6 +143,9 @@
   } else {
     LogWindowsError();
   }
+
+  writeXML_ = false;
+  writePlainText_ = false;
 }
 
 ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
@@ -142,7 +153,7 @@
 
 void ConsumeWindowsEventLog::initialize() {
   //! Set the supported properties
-  setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader });
+  setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat});
 
   //! Set the supported relationships
   setSupportedRelationships({Success});
@@ -156,9 +167,7 @@
     header.emplace_back(std::make_pair(name, value));
     return true;
   }
-  else {
   return false;
-  }
 }
 
 void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
@@ -196,6 +205,12 @@
     subscribe(context);
   }
 
+  std::string mode;
+  context->getProperty(OutputFormat.getName(), mode);
+
+  writeXML_ = (mode == Both || mode == XML);
+
+  writePlainText_ = (mode == Both || mode == Plaintext);
 }
 
 void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -305,39 +320,45 @@
                 }
                 // this is a well known path. 
                 std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-
-                auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
-                auto message = handler.getEventMessage(eventHandle);
+                wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(),
+                    pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_,
+                    pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
 
                 // resolve the event metadata
-                wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(), pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_, pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
                 doc.traverse(walker);
 
-                if (!message.empty())
-                {	
-                  for (const auto &mapEntry : walker.getIdentifiers()) {
-                    // replace the identifiers with their translated strings.
-                    utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+                if (pConsumeWindowsEventLog->writePlainText_) {
+                  auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
+                  auto message = handler.getEventMessage(eventHandle);
+
+                  if (!message.empty()) {
+
+                    for (const auto &mapEntry : walker.getIdentifiers()) {
+                      // replace the identifiers with their translated strings.
+                      utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+                    }
+                    wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
+                    // set the delimiter
+                    log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
+                    // render the header.
+                    renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+                    renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
+                    renderedData.rendered_text_ += message;
                   }
-                  wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
-                  // set the delimiter
-                  log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
-                  // render the header.
-                  renderedData.rendered_text_ = log_header.getEventHeader(&walker);
-                  renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
-                  renderedData.rendered_text_ += message;
                 }
 
-                if (pConsumeWindowsEventLog->resolve_as_attributes_) {
-                  renderedData.matched_fields_ = walker.getFieldValues();
+                if (pConsumeWindowsEventLog->writeXML_) {
+                  if (pConsumeWindowsEventLog->resolve_as_attributes_) {
+                    renderedData.matched_fields_ = walker.getFieldValues();
+                  }
+
+                  wel::XmlString writer;
+                  doc.print(writer,"", pugi::format_raw); // no indentation or formatting
+                  xml = writer.xml_;
+
+                  renderedData.text_ = std::move(xml);
                 }
 
-                wel::XmlString writer;
-                doc.print(writer,"", pugi::format_raw); // no indentation or formatting
-                xml = writer.xml_;
-
-                renderedData.text_ = std::move(xml);
-
                 pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(renderedData));
               } else {
                 logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
@@ -372,38 +393,46 @@
 {
   struct WriteCallback: public OutputStreamCallback {
     WriteCallback(const std::string& str)
-      : str_(str) {
+      : data_(str.c_str()), size_(str.length()) {
     }
 
     int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      return stream->writeData((uint8_t*)&str_[0], str_.size());
+      return stream->writeData((uint8_t*)data_, size_);
     }
 
     std::string str_;
+    const char * data_;
+    const size_t size_;
   };
 
   int flowFileCount = 0;
 
   EventRender evt;
   while (listRenderedData_.try_dequeue(evt)) {
-    auto flowFile = session->create();
+    if (writeXML_) {
+      auto flowFile = session->create();
 
-    session->write(flowFile, &WriteCallback(evt.text_));
-    for (const auto &fieldMapping : evt.matched_fields_) {
-      if (!fieldMapping.second.empty()) {
-        session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
+      session->write(flowFile, &WriteCallback(evt.text_));
+      for (const auto &fieldMapping : evt.matched_fields_) {
+        if (!fieldMapping.second.empty()) {
+          session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
+        }
       }
+      session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
+      session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
+                                                0);
+      session->transfer(flowFile, Success);
     }
-    session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
-    session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
-    session->transfer(flowFile, Success);
 
-    flowFile = session->create();
+    if (writePlainText_) {
+      auto flowFile = session->create();
 
-    session->write(flowFile, &WriteCallback(evt.rendered_text_));
-    session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
-    session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
-    session->transfer(flowFile, Success);
+      session->write(flowFile, &WriteCallback(evt.rendered_text_));
+      session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
+      session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
+                                                0);
+      session->transfer(flowFile, Success);
+    }
 
     flowFileCount++;
   }
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index dae262c..d03d0a7 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -76,6 +76,7 @@
   static core::Property ResolveAsAttributes;
   static core::Property EventHeaderDelimiter;
   static core::Property EventHeader;
+  static core::Property OutputFormat;
 
   //! Supported Relationships
   static core::Relationship Success;
@@ -105,6 +106,11 @@
   bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
 
   void LogWindowsError();
+
+  static constexpr const char * const XML = "XML";
+  static constexpr const char * const Both = "Both";
+  static constexpr const char * const Plaintext = "Plaintext";
+
 private:
 
   // Logger
@@ -125,6 +131,9 @@
   std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
   std::mutex cache_mutex_;
   std::map<std::string, wel::WindowsEventLogHandler > providers_;
+
+  bool writeXML_;
+  bool writePlainText_;
 };
 
 REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");