MINIFICPP-1058 - ConsumeWindowsEventLog should have a property that enables selection of output format
Signed-off-by: Daniel Bakai <bakaid@apache.org>
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index 54856db..75971fa 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -123,6 +123,14 @@
withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID,EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.")->
build());
+core::Property ConsumeWindowsEventLog::OutputFormat(
+ core::PropertyBuilder::createProperty("Output Format")->
+ isRequired(true)->
+ withDefaultValue(Both)->
+ withAllowableValues<std::string>({XML, Plaintext, Both})->
+ withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
+ build());
+
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
@@ -135,6 +143,9 @@
} else {
LogWindowsError();
}
+
+ writeXML_ = false;
+ writePlainText_ = false;
}
ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
@@ -142,7 +153,7 @@
void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
- setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader });
+ setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat});
//! Set the supported relationships
setSupportedRelationships({Success});
@@ -156,9 +167,7 @@
header.emplace_back(std::make_pair(name, value));
return true;
}
- else {
return false;
- }
}
void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
@@ -196,6 +205,12 @@
subscribe(context);
}
+ std::string mode;
+ context->getProperty(OutputFormat.getName(), mode);
+
+ writeXML_ = (mode == Both || mode == XML);
+
+ writePlainText_ = (mode == Both || mode == Plaintext);
}
void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
@@ -305,39 +320,45 @@
}
// this is a well known path.
std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
-
- auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
- auto message = handler.getEventMessage(eventHandle);
+ wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(),
+ pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_,
+ pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
// resolve the event metadata
- wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(), pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_, pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
doc.traverse(walker);
- if (!message.empty())
- {
- for (const auto &mapEntry : walker.getIdentifiers()) {
- // replace the identifiers with their translated strings.
- utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+ if (pConsumeWindowsEventLog->writePlainText_) {
+ auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
+ auto message = handler.getEventMessage(eventHandle);
+
+ if (!message.empty()) {
+
+ for (const auto &mapEntry : walker.getIdentifiers()) {
+ // replace the identifiers with their translated strings.
+ utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
+ }
+ wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
+ // set the delimiter
+ log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
+ // render the header.
+ renderedData.rendered_text_ = log_header.getEventHeader(&walker);
+ renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
+ renderedData.rendered_text_ += message;
}
- wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
- // set the delimiter
- log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
- // render the header.
- renderedData.rendered_text_ = log_header.getEventHeader(&walker);
- renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
- renderedData.rendered_text_ += message;
}
- if (pConsumeWindowsEventLog->resolve_as_attributes_) {
- renderedData.matched_fields_ = walker.getFieldValues();
+ if (pConsumeWindowsEventLog->writeXML_) {
+ if (pConsumeWindowsEventLog->resolve_as_attributes_) {
+ renderedData.matched_fields_ = walker.getFieldValues();
+ }
+
+ wel::XmlString writer;
+ doc.print(writer,"", pugi::format_raw); // no indentation or formatting
+ xml = writer.xml_;
+
+ renderedData.text_ = std::move(xml);
}
- wel::XmlString writer;
- doc.print(writer,"", pugi::format_raw); // no indentation or formatting
- xml = writer.xml_;
-
- renderedData.text_ = std::move(xml);
-
pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(renderedData));
} else {
logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
@@ -372,38 +393,46 @@
{
struct WriteCallback: public OutputStreamCallback {
WriteCallback(const std::string& str)
- : str_(str) {
+ : data_(str.c_str()), size_(str.length()) {
}
int64_t process(std::shared_ptr<io::BaseStream> stream) {
- return stream->writeData((uint8_t*)&str_[0], str_.size());
+ return stream->writeData((uint8_t*)data_, size_);
}
std::string str_;
+ const char * data_;
+ const size_t size_;
};
int flowFileCount = 0;
EventRender evt;
while (listRenderedData_.try_dequeue(evt)) {
- auto flowFile = session->create();
+ if (writeXML_) {
+ auto flowFile = session->create();
- session->write(flowFile, &WriteCallback(evt.text_));
- for (const auto &fieldMapping : evt.matched_fields_) {
- if (!fieldMapping.second.empty()) {
- session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
+ session->write(flowFile, &WriteCallback(evt.text_));
+ for (const auto &fieldMapping : evt.matched_fields_) {
+ if (!fieldMapping.second.empty()) {
+ session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
+ }
}
+ session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
+ session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
+ 0);
+ session->transfer(flowFile, Success);
}
- session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
- session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
- session->transfer(flowFile, Success);
- flowFile = session->create();
+ if (writePlainText_) {
+ auto flowFile = session->create();
- session->write(flowFile, &WriteCallback(evt.rendered_text_));
- session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
- session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0);
- session->transfer(flowFile, Success);
+ session->write(flowFile, &WriteCallback(evt.rendered_text_));
+ session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
+ session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
+ 0);
+ session->transfer(flowFile, Success);
+ }
flowFileCount++;
}
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index dae262c..d03d0a7 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -76,6 +76,7 @@
static core::Property ResolveAsAttributes;
static core::Property EventHeaderDelimiter;
static core::Property EventHeader;
+ static core::Property OutputFormat;
//! Supported Relationships
static core::Relationship Success;
@@ -105,6 +106,11 @@
bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value);
void LogWindowsError();
+
+ static constexpr const char * const XML = "XML";
+ static constexpr const char * const Both = "Both";
+ static constexpr const char * const Plaintext = "Plaintext";
+
private:
// Logger
@@ -125,6 +131,9 @@
std::shared_ptr<core::ProcessSessionFactory> sessionFactory_;
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
+
+ bool writeXML_;
+ bool writePlainText_;
};
REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");