| /** |
| * @file TailFile.h |
| * TailFile class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| |
| #include <map> |
| #include <memory> |
| #include <utility> |
| #include <string> |
| #include <unordered_map> |
| #include <vector> |
| #include <optional> |
| |
| #include "controllers/AttributeProviderService.h" |
| #include "core/Core.h" |
| #include "core/ProcessorImpl.h" |
| #include "core/ProcessSession.h" |
| #include "core/PropertyDefinition.h" |
| #include "core/PropertyDefinitionBuilder.h" |
| #include "minifi-cpp/core/PropertyValidator.h" |
| #include "core/RelationshipDefinition.h" |
| #include "core/logging/LoggerFactory.h" |
| #include "utils/Enum.h" |
| #include "utils/Export.h" |
| #include "utils/RegexUtils.h" |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| enum class InitialStartPositions { |
| BEGINNING_OF_TIME, |
| BEGINNING_OF_FILE, |
| CURRENT_TIME |
| }; |
| |
| enum class TailResultFormat { |
| FlowFilePerDelimiter, |
| FlowFilePerBatch |
| }; |
| |
| } // namespace org::apache::nifi::minifi::processors |
| |
| namespace magic_enum::customize { |
| using InitialStartPositions = org::apache::nifi::minifi::processors::InitialStartPositions; |
| using TailResultFormat = org::apache::nifi::minifi::processors::TailResultFormat; |
| |
| template <> |
| constexpr customize_t enum_name<InitialStartPositions>(InitialStartPositions value) noexcept { |
| switch (value) { |
| case InitialStartPositions::BEGINNING_OF_TIME: |
| return "Beginning of Time"; |
| case InitialStartPositions::BEGINNING_OF_FILE: |
| return "Beginning of File"; |
| case InitialStartPositions::CURRENT_TIME: |
| return "Current Time"; |
| } |
| return invalid_tag; |
| } |
| |
| template<> |
| constexpr customize_t enum_name<TailResultFormat>(TailResultFormat value) noexcept { |
| switch (value) { |
| case TailResultFormat::FlowFilePerBatch: |
| return "Flow file per batch"; |
| case TailResultFormat::FlowFilePerDelimiter: |
| return "Flow file per delimiter"; |
| } |
| return invalid_tag; |
| } |
| } // namespace magic_enum::customize |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| struct TailState { |
| TailState(std::filesystem::path path, std::filesystem::path file_name, uint64_t position, |
| const std::chrono::file_clock::time_point last_read_time, |
| const uint64_t checksum, const bool is_rotated = false) |
| : path_(std::move(path)), file_name_(std::move(file_name)), position_(position), last_read_time_(last_read_time), checksum_(checksum), is_rotated_(is_rotated) {} |
| |
| TailState(std::filesystem::path path, std::filesystem::path file_name, const bool is_rotated = false) |
| : TailState{std::move(path), std::move(file_name), 0, std::chrono::file_clock::time_point{}, 0, is_rotated} {} |
| |
| TailState() = default; |
| |
| [[nodiscard]] std::filesystem::path fileNameWithPath() const { |
| return path_ / file_name_; |
| } |
| |
| [[nodiscard]] int64_t lastReadTimeInMilliseconds() const { |
| return std::chrono::duration_cast<std::chrono::milliseconds>(last_read_time_.time_since_epoch()).count(); |
| } |
| |
| std::filesystem::path path_; |
| std::filesystem::path file_name_; |
| uint64_t position_ = 0; |
| std::chrono::file_clock::time_point last_read_time_; |
| uint64_t checksum_ = 0; |
| bool is_rotated_ = false; |
| }; |
| |
| std::ostream& operator<<(std::ostream &os, const TailState &tail_state); |
| |
| enum class Mode { |
| SINGLE, MULTIPLE, UNDEFINED |
| }; |
| |
| class TailFile : public core::ProcessorImpl { |
| public: |
| using ProcessorImpl::ProcessorImpl; |
| |
| EXTENSIONAPI static constexpr const char* Description = "\"Tails\" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual." |
| " Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\"," |
| " as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover" |
| " occurred while NiFi was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds," |
| " rather than running with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor" |
| " does not support ingesting files that have been compressed when 'rolled over'."; |
| |
| EXTENSIONAPI static constexpr auto FileName = core::PropertyDefinitionBuilder<>::createProperty("File to Tail") |
| .withDescription("Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode") |
| .isRequired(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto StateFile = core::PropertyDefinitionBuilder<>::createProperty("State File") |
| .withDescription("DEPRECATED. Only use it for state migration from the legacy state file.") |
| .isRequired(false) |
| .withDefaultValue("TailFileState") |
| .build(); |
| EXTENSIONAPI static constexpr auto Delimiter = core::PropertyDefinitionBuilder<>::createProperty("Input Delimiter") |
| .withDescription("Specifies the character that should be used for delimiting the data being tailed" |
| "from the incoming file. If none is specified, data will be ingested as it becomes available.") |
| .isRequired(false) |
| .withDefaultValue("\\n") |
| .build(); |
| EXTENSIONAPI static constexpr auto TailMode = core::PropertyDefinitionBuilder<2>::createProperty("tail-mode", "Tailing Mode") |
| .withDescription("Specifies the tail file mode. In 'Single file' mode only a single file will be watched. " |
| "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. " |
| "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.") |
| .isRequired(true) |
| .withAllowedValues({"Single file", "Multiple file"}) |
| .withDefaultValue("Single file") |
| .build(); |
| EXTENSIONAPI static constexpr auto BaseDirectory = core::PropertyDefinitionBuilder<>::createProperty("tail-base-directory", "Base Directory") |
| .withDescription("Base directory used to look for files to tail. This property is required when using Multiple file mode. " |
| "Can contain expression language placeholders if Attribute Provider Service is set.") |
| .isRequired(false) |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto RecursiveLookup = core::PropertyDefinitionBuilder<>::createProperty("Recursive lookup") |
| .withDescription("When using Multiple file mode, this property determines whether files are tailed in child directories of the Base Directory or not.") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("false") |
| .build(); |
| EXTENSIONAPI static constexpr auto LookupFrequency = core::PropertyDefinitionBuilder<>::createProperty("Lookup frequency") |
| .withDescription("When using Multiple file mode, this property specifies the minimum duration " |
| "the processor will wait between looking for new files to tail in the Base Directory.") |
| .isRequired(false) |
| .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) |
| .withDefaultValue("10 min") |
| .build(); |
| EXTENSIONAPI static constexpr auto RollingFilenamePattern = core::PropertyDefinitionBuilder<>::createProperty("Rolling Filename Pattern") |
| .withDescription("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to " |
| "identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. " |
| "This pattern supports the wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file " |
| "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed.") |
| .isRequired(false) |
| .withDefaultValue("${filename}.*") |
| .build(); |
| EXTENSIONAPI static constexpr auto InitialStartPosition = core::PropertyDefinitionBuilder<magic_enum::enum_count<InitialStartPositions>()>::createProperty("Initial Start Position") |
| .withDescription("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. " |
| "Once data has been ingested from a file, the Processor will continue from the last point from which it has received data.\n" |
| "Beginning of Time: Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail.\n" |
| "Beginning of File: Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over.\n" |
| "Current Time: Start with the data at the end of the File to Tail. Do not ingest any data that has already been rolled over or " |
| "any data in the File to Tail that has already been written.") |
| .isRequired(true) |
| .withDefaultValue(magic_enum::enum_name(InitialStartPositions::BEGINNING_OF_FILE)) |
| .withAllowedValues(magic_enum::enum_names<InitialStartPositions>()) |
| .build(); |
| EXTENSIONAPI static constexpr auto AttributeProviderService = core::PropertyDefinitionBuilder<>::createProperty("Attribute Provider Service") |
| .withDescription("Provides a list of key-value pair records which can be used in the Base Directory property using Expression Language. Requires Multiple file mode.") |
| .withAllowedTypes<minifi::controllers::AttributeProviderService>() |
| .build(); |
| EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") |
| .withDescription("Maximum number of lines emitted in a single trigger. If set to 0 all new content will be processed.") |
| .isRequired(true) |
| .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) |
| .withDefaultValue("0") |
| .build(); |
| EXTENSIONAPI static constexpr auto ResultFormat = core::PropertyDefinitionBuilder<magic_enum::enum_count<TailResultFormat>()>::createProperty("Result Mode") |
| .withDescription("Specifies how the result lines are arranged into output flow files") |
| .isRequired(true) |
| .withDefaultValue(magic_enum::enum_name(TailResultFormat::FlowFilePerDelimiter)) |
| .withAllowedValues(magic_enum::enum_names<TailResultFormat>()) |
| .build(); |
| EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ |
| FileName, |
| StateFile, |
| Delimiter, |
| TailMode, |
| BaseDirectory, |
| RecursiveLookup, |
| LookupFrequency, |
| RollingFilenamePattern, |
| InitialStartPosition, |
| AttributeProviderService, |
| BatchSize, |
| ResultFormat |
| }); |
| |
| |
| EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All files are routed to success"}; |
| EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; |
| |
| EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; |
| |
| EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; |
| |
| EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; |
| |
| EXTENSIONAPI static constexpr bool IsSingleThreaded = true; |
| |
| ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS |
| |
| void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override; |
| void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; |
| |
| void initialize() override; |
| bool recoverState(const core::ProcessContext& context); |
| void logState(); |
| bool storeState() const; |
| std::chrono::milliseconds getLookupFrequency() const; |
| |
| private: |
| struct TailStateWithMtime { |
| using TimePoint = std::chrono::time_point<std::chrono::file_clock, std::chrono::seconds>; |
| |
| TailStateWithMtime(TailState tail_state, TimePoint mtime) |
| : tail_state_(std::move(tail_state)), mtime_(mtime) {} |
| |
| TailState tail_state_; |
| TimePoint mtime_; |
| }; |
| |
| void parseStateFileLine(char *buf, std::map<std::filesystem::path, TailState> &state) const; |
| void processAllRotatedFiles(core::ProcessSession& session, TailState &state); |
| void processRotatedFiles(core::ProcessSession& session, TailState &state, std::vector<TailState> &rotated_file_states); |
| void processRotatedFilesAfterLastReadTime(core::ProcessSession& session, TailState &state); |
| std::string parseRollingFilePattern(const TailState &state) const; |
| std::vector<TailState> findAllRotatedFiles(const TailState &state) const; |
| std::vector<TailState> findRotatedFilesAfterLastReadTime(const TailState &state) const; |
| static std::vector<TailState> sortAndSkipMainFilePrefix(const TailState &state, std::vector<TailStateWithMtime>& matched_files_with_mtime); |
| void processFile(core::ProcessSession& session, |
| const std::filesystem::path& full_file_name, |
| TailState &state); |
| void processSingleFile(core::ProcessSession& session, |
| const std::filesystem::path& full_file_name, |
| TailState &state); |
| bool getStateFromStateManager(std::map<std::filesystem::path, TailState> &new_tail_states) const; |
| bool getStateFromLegacyStateFile(const core::ProcessContext& context, |
| std::map<std::filesystem::path, TailState> &new_tail_states) const; |
| void doMultifileLookup(core::ProcessContext& context); |
| void checkForRemovedFiles(); |
| void checkForNewFiles(core::ProcessContext& context); |
| static std::string baseDirectoryFromAttributes(const controllers::AttributeProviderService::AttributeMap& attribute_map, core::ProcessContext& context); |
| void updateFlowFileAttributes(const std::filesystem::path& full_file_name, const TailState &state, const std::filesystem::path& fileName, |
| const std::string &baseName, const std::string &extension, |
| core::FlowFile& flow_file) const; |
| static void updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum); |
| bool isOldFileInitiallyRead(const TailState &state) const; |
| |
| static constexpr int BUFFER_SIZE = 512; |
| |
| std::optional<char> delimiter_; // Delimiter for the data incoming from the tailed file. |
| core::StateManager* state_manager_ = nullptr; |
| std::map<std::filesystem::path, TailState> tail_states_; |
| Mode tail_mode_ = Mode::UNDEFINED; |
| std::optional<utils::Regex> pattern_regex_; |
| std::filesystem::path base_dir_; |
| bool recursive_lookup_ = false; |
| std::chrono::milliseconds lookup_frequency_{}; |
| std::chrono::steady_clock::time_point last_multifile_lookup_; |
| std::string rolling_filename_pattern_; |
| InitialStartPositions initial_start_position_; |
| bool first_trigger_{true}; |
| controllers::AttributeProviderService* attribute_provider_service_ = nullptr; |
| std::unordered_map<std::string, controllers::AttributeProviderService::AttributeMap> extra_attributes_; |
| std::optional<uint32_t> batch_size_; |
| size_t buffer_size_{}; |
| }; |
| |
| } // namespace org::apache::nifi::minifi::processors |
| |
| template <> |
| struct fmt::formatter<org::apache::nifi::minifi::processors::TailState> : fmt::ostream_formatter {}; |