| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #pragma once |
| |
| #include <Windows.h> |
| #include <winevt.h> |
| #include <Objbase.h> |
| |
| #include <sstream> |
| #include <regex> |
| #include <codecvt> |
| #include <mutex> |
| #include <unordered_map> |
| #include <tuple> |
| #include <map> |
| #include <memory> |
| #include <string> |
| |
| #include "core/Core.h" |
| #include "core/ProcessorImpl.h" |
| #include "core/ProcessSession.h" |
| #include "core/PropertyDefinition.h" |
| #include "core/PropertyDefinitionBuilder.h" |
| #include "minifi-cpp/core/PropertyValidator.h" |
| #include "core/RelationshipDefinition.h" |
| #include "core/StateManager.h" |
| #include "utils/OsUtils.h" |
| #include "wel/WindowsEventLog.h" |
| #include "wel/EventPath.h" |
| #include "FlowFileRecord.h" |
| #include "concurrentqueue.h" |
| #include "pugixml.hpp" |
| #include "utils/Enum.h" |
| #include "utils/Export.h" |
| #include "utils/RegexUtils.h" |
| |
| namespace org::apache::nifi::minifi::wel { |
| struct ProcessedEvent { |
| std::map<std::string, std::string> matched_fields; |
| std::string xml; |
| std::string plaintext; |
| std::string json; |
| }; |
| |
| enum class OutputFormat { |
| XML, |
| Both, // Both is DEPRECATED and removed from the documentation, but kept for backwards compatibility; it means XML + Plaintext |
| Plaintext, |
| JSON |
| }; |
| |
| enum class JsonFormat { |
| Raw, |
| Simple, |
| Flattened, |
| }; |
| |
| std::function<bool(std::string_view)> parseSidMatcher(const std::optional<std::string>& sid_matcher); |
| |
| class Bookmark; |
| } // namespace org::apache::nifi::minifi::wel |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| class ConsumeWindowsEventLog : public core::ProcessorImpl { |
| public: |
| explicit ConsumeWindowsEventLog(core::ProcessorMetadata metadata); |
| |
| ~ConsumeWindowsEventLog() override; |
| |
| EXTENSIONAPI static constexpr const char* Description = "Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath."; |
| |
| EXTENSIONAPI static constexpr auto Channel = core::PropertyDefinitionBuilder<>::createProperty("Channel") |
| .isRequired(true) |
| .withDefaultValue("System") |
| .withDescription("The Windows Event Log Channel to listen to. In order to process logs from a log file use the format 'SavedLog:\\<file path\\>'.") |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto Query = core::PropertyDefinitionBuilder<>::createProperty("Query") |
| .isRequired(true) |
| .withDefaultValue("*") |
| .withDescription("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)") |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto MaxBufferSize = core::PropertyDefinitionBuilder<>::createProperty("Max Buffer Size") |
| .isRequired(true) |
| .withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR) |
| .withDefaultValue("1 MB") |
| .withDescription("The individual Event Log XMLs are rendered to a buffer." |
| " This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)") |
| .build(); |
| // !!! This property is obsolete since now subscription is not used, but leave since it might be is used already in config.yml. |
| EXTENSIONAPI static constexpr auto InactiveDurationToReconnect = core::PropertyDefinitionBuilder<>::createProperty("Inactive Duration To Reconnect") |
| .isRequired(true) |
| .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) |
| .withDefaultValue("10 min") |
| .withDescription("If no new event logs are processed for the specified time period, " |
| " this processor will try reconnecting to recover from a state where any further messages cannot be consumed." |
| " Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned." |
| " Setting no duration, e.g. '0 ms' disables auto-reconnection.") |
| .build(); |
| EXTENSIONAPI static constexpr auto IdentifierMatcher = core::PropertyDefinitionBuilder<>::createProperty("Identifier Match Regex") |
| .isRequired(false) |
| .withDefaultValue(".*Sid") |
| .withDescription("Regular Expression to match Subject Identifier Fields. These will be placed into the attributes of the FlowFile") |
| .build(); |
| EXTENSIONAPI static constexpr auto IdentifierFunction = core::PropertyDefinitionBuilder<>::createProperty("Apply Identifier Function") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("true") |
| .withDescription("If true it will resolve SIDs matched in the 'Identifier Match Regex' to the DOMAIN\\USERNAME associated with that ID") |
| .build(); |
| EXTENSIONAPI static constexpr auto ResolveAsAttributes = core::PropertyDefinitionBuilder<>::createProperty("Resolve Metadata in Attributes") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("true") |
| .withDescription("If true, any metadata that is resolved ( such as IDs or keyword metadata ) will be placed into attributes, otherwise it will be replaced in the XML or text output") |
| .build(); |
| EXTENSIONAPI static constexpr auto EventHeaderDelimiter = core::PropertyDefinitionBuilder<>::createProperty("Event Header Delimiter") |
| .isRequired(false) |
| .withDescription("If set, the chosen delimiter will be used in the Event output header. Otherwise, a colon followed by spaces will be used.") |
| .build(); |
| EXTENSIONAPI static constexpr auto EventHeader = core::PropertyDefinitionBuilder<>::createProperty("Event Header") |
| .isRequired(false) |
| .withDefaultValue("LOG_NAME=Log Name, SOURCE = Source, TIME_CREATED = Date,EVENT_RECORDID=Record ID,EVENTID = Event ID," |
| "TASK_CATEGORY = Task Category,LEVEL = Level,KEYWORDS = Keywords,USER = User,COMPUTER = Computer, EVENT_TYPE = EventType") |
| .withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID," |
| "EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.") |
| .build(); |
| EXTENSIONAPI static constexpr auto OutputFormatProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<wel::OutputFormat>()>::createProperty("Output Format") |
| .isRequired(true) |
| .withDefaultValue(magic_enum::enum_name(wel::OutputFormat::XML)) |
| .withAllowedValues(magic_enum::enum_names<wel::OutputFormat>()) |
| .withDescription("The format of the output flow files.") |
| .build(); |
| EXTENSIONAPI static constexpr auto JsonFormatProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<wel::JsonFormat>()>::createProperty("JSON Format") |
| .isRequired(true) |
| .withDefaultValue(magic_enum::enum_name(wel::JsonFormat::Simple)) |
| .withAllowedValues(magic_enum::enum_names<wel::JsonFormat>()) |
| .withDescription("Set the json format type. Only applicable if Output Format is set to 'JSON'") |
| .build(); |
| EXTENSIONAPI static constexpr auto BatchCommitSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Commit Size") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) |
| .withDefaultValue("1000") |
| .withDescription("Maximum number of Events to consume and create to Flow Files from before committing.") |
| .build(); |
| EXTENSIONAPI static constexpr auto BookmarkRootDirectory = core::PropertyDefinitionBuilder<>::createProperty("State Directory") |
| .isRequired(false) |
| .withDefaultValue("CWELState") |
| .withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.") |
| .build(); |
| EXTENSIONAPI static constexpr auto ProcessOldEvents = core::PropertyDefinitionBuilder<>::createProperty("Process Old Events") |
| .isRequired(true) |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("false") |
| .withDescription("This property defines if old events (which are created before first time server is started) should be processed.") |
| .build(); |
| EXTENSIONAPI static constexpr auto CacheSidLookups = core::PropertyDefinitionBuilder<>::createProperty("Cache SID Lookups") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("true") |
| .withDescription("Determines whether SID to name lookups are cached in memory") |
| .build(); |
| EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ |
| Channel, |
| Query, |
| MaxBufferSize, |
| InactiveDurationToReconnect, |
| IdentifierMatcher, |
| IdentifierFunction, |
| ResolveAsAttributes, |
| EventHeaderDelimiter, |
| EventHeader, |
| OutputFormatProperty, |
| JsonFormatProperty, |
| BatchCommitSize, |
| BookmarkRootDirectory, |
| ProcessOldEvents, |
| CacheSidLookups |
| }); |
| |
| |
| EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Relationship for successfully consumed events."}; |
| EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; |
| |
| EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; |
| EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; |
| EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; |
| EXTENSIONAPI static constexpr bool IsSingleThreaded = true; |
| |
| ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS |
| |
| void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; |
| void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; |
| void initialize() override; |
| void notifyStop() override; |
| |
| private: |
| void refreshTimeZoneData(); |
| void createAndCommitFlowFile(const wel::ProcessedEvent& processed_event, core::ProcessSession& session) const; |
| wel::WindowsEventLogProvider& getEventLogProvider(const std::string& name); |
| wel::HeaderNames createHeaderNames(const std::optional<std::string>& event_header_property) const; |
| nonstd::expected<wel::ProcessedEvent, std::string> processEvent(EVT_HANDLE event_handle); |
| void substituteXMLPercentageItems(pugi::xml_document& doc); |
| std::function<std::string(const std::string&)> userIdToUsernameFunction() const; |
| nonstd::expected<std::string, std::string> renderEventAsXml(EVT_HANDLE event_handle); |
| bool commitAndSaveBookmark(const std::wstring& bookmarkXml, core::ProcessContext& context, core::ProcessSession& session); |
| std::tuple<size_t, std::wstring> processEventLogs(core::ProcessSession& session, EVT_HANDLE event_query_results); |
| std::unique_ptr<wel::Bookmark> createBookmark(const core::ProcessContext& context) const; |
| |
| core::StateManager* state_manager_{nullptr}; |
| wel::HeaderNames header_names_; |
| std::optional<std::string> header_delimiter_; |
| wel::EventPath path_; |
| std::wstring wstr_query_; |
| std::function<bool(std::string_view)> sid_matcher_; |
| bool resolve_as_attributes_{false}; |
| bool apply_identifier_function_{false}; |
| std::string provenanceUri_; |
| std::string computerName_; |
| uint64_t max_buffer_size_{}; |
| std::map<std::string, wel::WindowsEventLogProvider> providers_; |
| uint64_t batch_commit_size_{}; |
| bool cache_sid_lookups_ = true; |
| |
| wel::OutputFormat output_format_; |
| wel::JsonFormat json_format_; |
| |
| std::unique_ptr<wel::Bookmark> bookmark_; |
| std::mutex on_trigger_mutex_; |
| std::unordered_map<std::string, std::string> xmlPercentageItemsResolutions_; |
| HMODULE hMsobjsDll_{}; |
| |
| std::string timezone_name_; |
| std::string timezone_offset_; // Represented as UTC offset in (+|-)HH:MM format, like +02:00 |
| }; |
| |
| } // namespace org::apache::nifi::minifi::processors |