blob: 77f0f900611bd298c401486df981c8d509f70a9c [file] [log] [blame]
/**
* @file ConsumeWindowsEventLog.cpp
* ConsumeWindowsEventLog class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeWindowsEventLog.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>
#include <memory>
#include <regex>
#include "wel/MetadataWalker.h"
#include "wel/XMLString.h"
#include "wel/UnicodeConversion.h"
#include "utils/ScopeGuard.h"
#include "io/DataStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "ole32.lib")
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const std::string ConsumeWindowsEventLog::ProcessorName("ConsumeWindowsEventLog");
core::Property ConsumeWindowsEventLog::Channel(
core::PropertyBuilder::createProperty("Channel")->
isRequired(true)->
withDefaultValue("System")->
withDescription("The Windows Event Log Channel to listen to.")->
supportsExpressionLanguage(true)->
build());
core::Property ConsumeWindowsEventLog::Query(
core::PropertyBuilder::createProperty("Query")->
isRequired(true)->
withDefaultValue("*")->
withDescription("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")->
supportsExpressionLanguage(true)->
build());
core::Property ConsumeWindowsEventLog::MaxBufferSize(
core::PropertyBuilder::createProperty("Max Buffer Size")->
isRequired(true)->
withDefaultValue<core::DataSizeValue>("1 MB")->
withDescription(
"The individual Event Log XMLs are rendered to a buffer."
" This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)")->
build());
core::Property ConsumeWindowsEventLog::InactiveDurationToReconnect(
core::PropertyBuilder::createProperty("Inactive Duration To Reconnect")->
isRequired(true)->
withDefaultValue<core::TimePeriodValue>("10 min")->
withDescription(
"If no new event logs are processed for the specified time period, "
" this processor will try reconnecting to recover from a state where any further messages cannot be consumed."
" Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned."
" Setting no duration, e.g. '0 ms' disables auto-reconnection.")->
build());
core::Property ConsumeWindowsEventLog::IdentifierMatcher(
core::PropertyBuilder::createProperty("Identifier Match Regex")->
isRequired(false)->
withDefaultValue(".*Sid")->
withDescription("Regular Expression to match Subject Identifier Fields. These will be placed into the attributes of the FlowFile")->
build());
core::Property ConsumeWindowsEventLog::IdentifierFunction(
core::PropertyBuilder::createProperty("Apply Identifier Function")->
isRequired(false)->
withDefaultValue<bool>(true)->
withDescription("If true it will resolve SIDs matched in the 'Identifier Match Regex' to the DOMAIN\\USERNAME associated with that ID")->
build());
core::Property ConsumeWindowsEventLog::ResolveAsAttributes(
core::PropertyBuilder::createProperty("Resolve Metadata in Attributes")->
isRequired(false)->
withDefaultValue<bool>(true)->
withDescription("If true, any metadata that is resolved ( such as IDs or keyword metadata ) will be placed into attributes, otherwise it will be replaced in the XML or text output")->
build());
core::Property ConsumeWindowsEventLog::EventHeaderDelimiter(
core::PropertyBuilder::createProperty("Event Header Delimiter")->
isRequired(false)->
withDescription("If set, the chosen delimiter will be used in the Event output header. Otherwise, a colon followed by spaces will be used.")->
build());
core::Property ConsumeWindowsEventLog::EventHeader(
core::PropertyBuilder::createProperty("Event Header")->
isRequired(false)->
withDefaultValue("LOG_NAME=Log Name, SOURCE = Source, TIME_CREATED = Date,EVENT_RECORDID=Record ID,EVENTID = Event ID,TASK_CATEGORY = Task Category,LEVEL = Level,KEYWORDS = Keywords,USER = User,COMPUTER = Computer, EVENT_TYPE = EventType")->
withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID,EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.")->
build());
core::Property ConsumeWindowsEventLog::OutputFormat(
core::PropertyBuilder::createProperty("Output Format")->
isRequired(true)->
withDefaultValue(Both)->
withAllowableValues<std::string>({XML, Plaintext, Both})->
withDescription("Set the output format type. In case \'Both\' is selected the processor generates two flow files for every event captured")->
build());
core::Property ConsumeWindowsEventLog::BatchCommitSize(
core::PropertyBuilder::createProperty("Batch Commit Size")->
isRequired(false)->
withDefaultValue<uint64_t>(1000U)->
withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")->
build());
core::Relationship ConsumeWindowsEventLog::Success("success", "Relationship for successfully consumed events.");
ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid)
: core::Processor(name, uuid), logger_(logging::LoggerFactory<ConsumeWindowsEventLog>::getLogger()), apply_identifier_function_(false), batch_commit_size_(0U) {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
computerName_ = buff;
} else {
LogWindowsError();
}
writeXML_ = false;
writePlainText_ = false;
}
ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
}
void ConsumeWindowsEventLog::initialize() {
//! Set the supported properties
setSupportedProperties({Channel, Query, MaxBufferSize, InactiveDurationToReconnect, IdentifierMatcher, IdentifierFunction, ResolveAsAttributes, EventHeaderDelimiter, EventHeader, OutputFormat, BatchCommitSize});
//! Set the supported relationships
setSupportedRelationships({Success});
}
bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string & value) {
wel::METADATA name = wel::WindowsEventLogMetadata::getMetadataFromString(key);
if (name != wel::METADATA::UNKNOWN) {
header.emplace_back(std::make_pair(name, value));
return true;
}
return false;
}
void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
context->getProperty(IdentifierMatcher.getName(), regex_);
context->getProperty(ResolveAsAttributes.getName(), resolve_as_attributes_);
context->getProperty(IdentifierFunction.getName(), apply_identifier_function_);
context->getProperty(EventHeaderDelimiter.getName(), header_delimiter_);
context->getProperty(BatchCommitSize.getName(), batch_commit_size_);
std::string header;
context->getProperty(EventHeader.getName(), header);
auto keyValueSplit = utils::StringUtils::split(header, ",");
for (const auto &kv : keyValueSplit) {
auto splitKeyAndValue = utils::StringUtils::split(kv, "=");
if (splitKeyAndValue.size() == 2) {
auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
auto value = utils::StringUtils::trim(splitKeyAndValue.at(1));
if (!insertHeaderName(header_names_, key, value)) {
logger_->log_error("%s is an invalid key for the header map", key);
}
}
else if (splitKeyAndValue.size() == 1) {
auto key = utils::StringUtils::trim(splitKeyAndValue.at(0));
if (!insertHeaderName(header_names_, key, "")) {
logger_->log_error("%s is an invalid key for the header map", key);
}
}
}
if (subscriptionHandle_) {
logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
} else {
sessionFactory_ = sessionFactory;
subscribe(context);
}
std::string mode;
context->getProperty(OutputFormat.getName(), mode);
writeXML_ = (mode == Both || mode == XML);
writePlainText_ = (mode == Both || mode == Plaintext);
}
void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
if (!subscriptionHandle_) {
if (!subscribe(context)) {
context->yield();
return;
}
}
const auto flowFileCount = processQueue(session);
const auto now = GetTickCount64();
if (flowFileCount > 0) {
lastActivityTimestamp_ = now;
}
else if (inactiveDurationToReconnect_ > 0) {
if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
unsubscribe();
}
}
}
wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const std::string & name) {
std::lock_guard<std::mutex> lock(cache_mutex_);
auto provider = providers_.find(name);
if (provider != std::end(providers_)) {
return provider->second;
}
std::wstring temp_wstring = std::wstring(name.begin(), name.end());
LPCWSTR widechar = temp_wstring.c_str();
providers_[name] = wel::WindowsEventLogHandler(EvtOpenPublisherMetadata(NULL, widechar, NULL, 0, 0));
return providers_[name];
}
bool ConsumeWindowsEventLog::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
context->getProperty(Channel.getName(), channel_);
std::string query;
context->getProperty(Query.getName(), query);
context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %lld", maxBufferSize_);
provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
std::string strInactiveDurationToReconnect;
context->getProperty(InactiveDurationToReconnect.getName(), strInactiveDurationToReconnect);
// Get 'inactiveDurationToReconnect_'.
core::TimeUnit unit;
if (core::Property::StringToTime(strInactiveDurationToReconnect, inactiveDurationToReconnect_, unit) &&
core::Property::ConvertTimeUnitToMS(inactiveDurationToReconnect_, unit, inactiveDurationToReconnect_)) {
logger_->log_info("inactiveDurationToReconnect: [%lld] ms", inactiveDurationToReconnect_);
}
subscriptionHandle_ = EvtSubscribe(
NULL,
NULL,
std::wstring(channel_.begin(), channel_.end()).c_str(),
std::wstring(query.begin(), query.end()).c_str(),
NULL,
this,
[](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE eventHandle)
{
auto pConsumeWindowsEventLog = static_cast<ConsumeWindowsEventLog*>(pContext);
auto& logger = pConsumeWindowsEventLog->logger_;
if (action == EvtSubscribeActionError) {
if (ERROR_EVT_QUERY_RESULT_STALE == (DWORD)eventHandle) {
logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
} else {
logger->log_error("Received the following Win32 error: %x", eventHandle);
}
} else if (action == EvtSubscribeActionDeliver) {
DWORD size = 0;
DWORD used = 0;
DWORD propertyCount = 0;
if (!EvtRender(NULL, eventHandle, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
if (used > pConsumeWindowsEventLog->maxBufferSize_) {
logger->log_error("Dropping event %x because it couldn't be rendered within %ll bytes.", eventHandle, pConsumeWindowsEventLog->maxBufferSize_);
return 0UL;
}
size = used;
std::vector<wchar_t> buf(size/2 + 1);
if (EvtRender(NULL, eventHandle, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
std::string xml = wel::to_string(&buf[0]);
EventRender renderedData;
pugi::xml_document doc;
pugi::xml_parse_result result = doc.load_string(xml.c_str());
if (!result) {
logger->log_error("Invalid XML produced");
return 0UL;
}
// this is a well known path.
std::string providerName = doc.child("Event").child("System").child("Provider").attribute("Name").value();
wel::MetadataWalker walker(pConsumeWindowsEventLog->getEventLogHandler(providerName).getMetadata(),
pConsumeWindowsEventLog->channel_, eventHandle, !pConsumeWindowsEventLog->resolve_as_attributes_,
pConsumeWindowsEventLog->apply_identifier_function_, pConsumeWindowsEventLog->regex_);
// resolve the event metadata
doc.traverse(walker);
if (pConsumeWindowsEventLog->writePlainText_) {
auto handler = pConsumeWindowsEventLog->getEventLogHandler(providerName);
auto message = handler.getEventMessage(eventHandle);
if (!message.empty()) {
for (const auto &mapEntry : walker.getIdentifiers()) {
// replace the identifiers with their translated strings.
utils::StringUtils::replaceAll(message, mapEntry.first, mapEntry.second);
}
wel::WindowsEventLogHeader log_header(pConsumeWindowsEventLog->header_names_);
// set the delimiter
log_header.setDelimiter(pConsumeWindowsEventLog->header_delimiter_);
// render the header.
renderedData.rendered_text_ = log_header.getEventHeader(&walker);
renderedData.rendered_text_ += "Message" + pConsumeWindowsEventLog->header_delimiter_ + " ";
renderedData.rendered_text_ += message;
}
}
if (pConsumeWindowsEventLog->writeXML_) {
if (pConsumeWindowsEventLog->resolve_as_attributes_) {
renderedData.matched_fields_ = walker.getFieldValues();
}
wel::XmlString writer;
doc.print(writer,"", pugi::format_raw); // no indentation or formatting
xml = writer.xml_;
renderedData.text_ = std::move(xml);
}
pConsumeWindowsEventLog->listRenderedData_.enqueue(std::move(renderedData));
} else {
logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
}
}
}
}
return 0UL;
},
EvtSubscribeToFutureEvents | EvtSubscribeStrict);
if (!subscriptionHandle_) {
logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
return false;
}
lastActivityTimestamp_ = GetTickCount64();
return true;
}
void ConsumeWindowsEventLog::unsubscribe()
{
if (subscriptionHandle_) {
EvtClose(subscriptionHandle_);
subscriptionHandle_ = 0;
}
}
int ConsumeWindowsEventLog::processQueue(const std::shared_ptr<core::ProcessSession> &session)
{
struct WriteCallback: public OutputStreamCallback {
WriteCallback(const std::string& str)
: data_(str.c_str()), size_(str.length()) {
}
int64_t process(std::shared_ptr<io::BaseStream> stream) {
return stream->writeData((uint8_t*)data_, size_);
}
std::string str_;
const char * data_;
const size_t size_;
};
int flowFileCount = 0;
auto before_time = std::chrono::high_resolution_clock::now();
utils::ScopeGuard timeGuard([&](){
logger_->log_debug("processQueue processed %d Events in %llu ms",
flowFileCount,
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_time).count());
});
EventRender evt;
while (listRenderedData_.try_dequeue(evt)) {
if (writeXML_) {
auto flowFile = session->create();
session->write(flowFile, &WriteCallback(evt.text_));
for (const auto &fieldMapping : evt.matched_fields_) {
if (!fieldMapping.second.empty()) {
session->putAttribute(flowFile, fieldMapping.first, fieldMapping.second);
}
}
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "application/xml");
session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
0);
session->transfer(flowFile, Success);
}
if (writePlainText_) {
auto flowFile = session->create();
session->write(flowFile, &WriteCallback(evt.rendered_text_));
session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs",
0);
session->transfer(flowFile, Success);
}
flowFileCount++;
if (batch_commit_size_ != 0U && (flowFileCount % batch_commit_size_ == 0)) {
auto before_commit = std::chrono::high_resolution_clock::now();
session->commit();
logger_->log_debug("processQueue commit took %llu ms",
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - before_commit).count());
}
}
return flowFileCount;
}
void ConsumeWindowsEventLog::notifyStop()
{
unsubscribe();
if (listRenderedData_.size_approx() != 0) {
auto session = sessionFactory_->createSession();
if (session) {
logger_->log_info("Finishing processing leftover events");
processQueue(session);
} else {
logger_->log_error(
"Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. "
"Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, "
"receiving events and stopping before the onTrigger happens. The messages in the internal queue cannot finish processing until "
"the processor is triggered to run.");
}
}
}
void ConsumeWindowsEventLog::LogWindowsError()
{
auto error_id = GetLastError();
LPVOID lpMsg;
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
error_id,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)&lpMsg,
0, NULL);
logger_->log_error("Error %d: %s\n", (int)error_id, (char *)lpMsg);
LocalFree(lpMsg);
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */