| /** |
| * @file TailFile.cpp |
| * TailFile class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <time.h> |
| #include <stdio.h> |
| |
| #include <limits.h> |
| #ifndef WIN32 |
| #include <dirent.h> |
| #include <unistd.h> |
| #endif |
| #include <vector> |
| #include <queue> |
| #include <map> |
| #include <set> |
| #include <memory> |
| #include <algorithm> |
| #include <sstream> |
| #include <string> |
| #include <iostream> |
| #include "utils/file/FileUtils.h" |
| #include "utils/file/PathUtils.h" |
| #include "utils/TimeUtil.h" |
| #include "utils/StringUtils.h" |
| #include "utils/RegexUtils.h" |
| #ifdef HAVE_REGEX_CPP |
| #include <regex> |
| #else |
| #include <regex.h> |
| #endif |
| #include "TailFile.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| |
| #ifndef S_ISDIR |
| #define S_ISDIR(mode) (((mode) & S_IFMT) == S_IFDIR) |
| #endif |
| |
| #if defined(__clang__) |
| #pragma clang diagnostic push |
| #pragma clang diagnostic ignored "-Wsign-compare" |
| #elif defined(__GNUC__) || defined(__GNUG__) |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wsign-compare" |
| #endif |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| |
| core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", ""); |
| core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for state migration from the legacy state file.", |
| "TailFileState"); |
| core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed" |
| "from the incoming file." |
| "If none is specified, data will be ingested as it becomes available.", |
| ""); |
| |
| core::Property TailFile::TailMode( |
| core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")->withDescription( |
| "Specifies the tail file mode. In 'Single file' mode only a single file will be watched. " |
| "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. " |
| "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true) |
| ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")->build()); |
| |
| core::Property TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")->isRequired(false)->build()); |
| |
| core::Relationship TailFile::Success("success", "All files are routed to success"); |
| |
| const char *TailFile::CURRENT_STR = "CURRENT."; |
| const char *TailFile::POSITION_STR = "POSITION."; |
| |
| void TailFile::initialize() { |
| // Set the supported properties |
| std::set<core::Property> properties; |
| properties.insert(FileName); |
| properties.insert(StateFile); |
| properties.insert(Delimiter); |
| properties.insert(TailMode); |
| properties.insert(BaseDirectory); |
| setSupportedProperties(properties); |
| // Set the supported relationships |
| std::set<core::Relationship> relationships; |
| relationships.insert(Success); |
| setSupportedRelationships(relationships); |
| } |
| |
| void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { |
| std::lock_guard<std::mutex> tail_lock(tail_file_mutex_); |
| |
| // can perform these in notifyStop, but this has the same outcome |
| tail_states_.clear(); |
| state_recovered_ = false; |
| |
| state_manager_ = context->getStateManager(); |
| if (state_manager_ == nullptr) { |
| throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); |
| } |
| |
| std::string value; |
| |
| if (context->getProperty(Delimiter.getName(), value)) { |
| delimiter_ = value; |
| } |
| |
| std::string mode; |
| context->getProperty(TailMode.getName(), mode); |
| |
| std::string file = ""; |
| if (!context->getProperty(FileName.getName(), file)) { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to Tail is a required property"); |
| } |
| if (mode == "Multiple file") { |
| // file is a regex |
| std::string base_dir; |
| if (!context->getProperty(BaseDirectory.getName(), base_dir)) { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode."); |
| } |
| |
| auto fileRegexSelect = [&](const std::string& path, const std::string& filename) -> bool { |
| if (acceptFile(file, filename)) { |
| tail_states_.insert(std::make_pair(filename, TailState {path, filename, 0, 0})); |
| } |
| return true; |
| }; |
| |
| utils::file::FileUtils::list_dir(base_dir, fileRegexSelect, logger_, false); |
| |
| } else { |
| std::string fileLocation, fileName; |
| if (utils::file::PathUtils::getFileNameAndPath(file, fileLocation, fileName)) { |
| tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 })); |
| } else { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file"); |
| } |
| } |
| } |
| |
| bool TailFile::acceptFile(const std::string &fileFilter, const std::string &file) { |
| utils::Regex rgx(fileFilter); |
| return rgx.match(file); |
| } |
| |
| std::string TailFile::trimLeft(const std::string& s) { |
| return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s); |
| } |
| |
| std::string TailFile::trimRight(const std::string& s) { |
| return org::apache::nifi::minifi::utils::StringUtils::trimRight(s); |
| } |
| |
| void TailFile::parseStateFileLine(char *buf) { |
| char *line = buf; |
| |
| logger_->log_trace("Received line %s", buf); |
| |
| while ((line[0] == ' ') || (line[0] == '\t')) |
| ++line; |
| |
| char first = line[0]; |
| if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) { |
| return; |
| } |
| |
| char *equal = strchr(line, '='); |
| if (equal == NULL) { |
| return; |
| } |
| |
| equal[0] = '\0'; |
| std::string key = line; |
| |
| equal++; |
| while ((equal[0] == ' ') || (equal[0] == '\t')) |
| ++equal; |
| |
| first = equal[0]; |
| if ((first == '\0') || (first == '\r') || (first == '\n')) { |
| return; |
| } |
| |
| std::string value = equal; |
| key = trimRight(key); |
| value = trimRight(value); |
| |
| if (key == "FILENAME") { |
| std::string fileLocation, fileName; |
| if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) { |
| logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName); |
| tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 })); |
| } else { |
| tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 })); |
| } |
| } |
| if (key == "POSITION") { |
| // for backwards compatibility |
| if (tail_states_.size() != 1) { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types"); |
| } |
| const auto position = std::stoull(value); |
| logger_->log_debug("Received position %d", position); |
| tail_states_.begin()->second.currentTailFilePosition_ = position; |
| } |
| if (key.find(CURRENT_STR) == 0) { |
| const auto file = key.substr(strlen(CURRENT_STR)); |
| std::string fileLocation, fileName; |
| if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) { |
| tail_states_[file].path_ = fileLocation; |
| tail_states_[file].current_file_name_ = fileName; |
| } else { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name"); |
| } |
| } |
| |
| if (key.find(POSITION_STR) == 0) { |
| const auto file = key.substr(strlen(POSITION_STR)); |
| tail_states_[file].currentTailFilePosition_ = std::stoull(value); |
| } |
| } |
| |
| |
| |
| bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) { |
| bool state_load_success = false; |
| |
| std::unordered_map<std::string, std::string> state_map; |
| if (state_manager_->get(state_map)) { |
| std::map<std::string, TailState> new_tail_states; |
| for (size_t i = 0U;; i++) { |
| std::string name; |
| try { |
| name = state_map.at("file." + std::to_string(i) + ".name"); |
| } catch (...) { |
| break; |
| } |
| try { |
| const std::string& current = state_map.at("file." + std::to_string(i) + ".current"); |
| uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position")); |
| |
| std::string fileLocation, fileName; |
| if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) { |
| logger_->log_debug("Received path %s, file %s", fileLocation, fileName); |
| new_tail_states.emplace(fileName, TailState { fileLocation, fileName, position, 0 }); |
| } else { |
| new_tail_states.emplace(current, TailState { fileLocation, current, position, 0 }); |
| } |
| } catch (...) { |
| continue; |
| } |
| } |
| state_load_success = true; |
| tail_states_ = std::move(new_tail_states); |
| for (const auto& s : tail_states_) { |
| logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, s.second.currentTailFileModificationTime_); |
| } |
| } else { |
| logger_->log_info("Found no stored state"); |
| } |
| |
| /* We could not get the state from the StateManager, try to migrate the old state file if it exists */ |
| if (!state_load_success) { |
| std::ifstream file(state_file_.c_str(), std::ifstream::in); |
| if (!file.good()) { |
| logger_->log_error("load state file failed %s", state_file_); |
| return false; |
| } |
| tail_states_.clear(); |
| char buf[BUFFER_SIZE]; |
| for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { |
| parseStateFileLine(buf); |
| } |
| } |
| |
| /** |
| * recover times and validate that we have paths |
| */ |
| |
| for (auto &state : tail_states_) { |
| std::string fileLocation, fileName; |
| if (!utils::file::PathUtils::getFileNameAndPath(state.second.current_file_name_, fileLocation, fileName) && state.second.path_.empty()) { |
| throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file does not contain a full path and file name"); |
| } |
| struct stat sb; |
| const auto fileFullName = state.second.path_ + utils::file::FileUtils::get_separator() + state.second.current_file_name_; |
| if (stat(fileFullName.c_str(), &sb) == 0) { |
| state.second.currentTailFileModificationTime_ = ((uint64_t) (sb.st_mtime) * 1000); |
| } |
| } |
| |
| logger_->log_debug("load state succeeded"); |
| |
| /* Save the state to the state manager */ |
| storeState(context); |
| |
| return true; |
| } |
| |
| bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context) { |
| std::unordered_map<std::string, std::string> state; |
| size_t i = 0; |
| for (const auto& tail_state : tail_states_) { |
| state["file." + std::to_string(i) + ".name"] = tail_state.first; |
| state["file." + std::to_string(i) + ".current"] = utils::file::FileUtils::concat_path(tail_state.second.path_, tail_state.second.current_file_name_); |
| state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.currentTailFilePosition_); |
| ++i; |
| } |
| if (!state_manager_->set(state)) { |
| logger_->log_error("Failed to set state"); |
| return false; |
| } |
| return true; |
| } |
| |
| static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { |
| return (i.modifiedTime < j.modifiedTime); |
| } |
| void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name) { |
| struct stat statbuf; |
| std::vector<TailMatchedFileItem> matchedFiles; |
| std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_; |
| |
| if (stat(fullPath.c_str(), &statbuf) == 0) { |
| logger_->log_trace("Searching for files rolled over"); |
| std::string pattern = file.current_file_name_; |
| std::size_t found = file.current_file_name_.find_last_of("."); |
| if (found != std::string::npos) |
| pattern = file.current_file_name_.substr(0, found); |
| |
| // Callback, called for each file entry in the listed directory |
| // Return value is used to break (false) or continue (true) listing |
| auto lambda = [&](const std::string& path, const std::string& filename) -> bool { |
| struct stat sb; |
| std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename; |
| if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) { |
| uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000); |
| if (candidateModTime >= file.currentTailFileModificationTime_) { |
| logging::LOG_TRACE(logger_) << "File " << filename << " (short name " << file.current_file_name_ << |
| ") disk mod time " << candidateModTime << ", struct mod time " << file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", position " << file.currentTailFilePosition_; |
| if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ && |
| sb.st_size == file.currentTailFilePosition_) { |
| return true; // Skip the current file as a candidate in case it wasn't updated |
| } |
| TailMatchedFileItem item; |
| item.fileName = filename; |
| item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000); |
| matchedFiles.push_back(item); |
| } |
| } |
| return true;}; |
| |
| utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false); |
| |
| if (matchedFiles.size() < 1) { |
| logger_->log_debug("No newer files found in directory!"); |
| return; |
| } |
| |
| // Sort the list based on modified time |
| std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); |
| TailMatchedFileItem item = matchedFiles[0]; |
| logger_->log_info("TailFile File Roll Over from %s to %s", file.current_file_name_, item.fileName); |
| |
| // Going ahead in the file rolled over |
| if (file.current_file_name_ != base_file_name) { |
| logger_->log_debug("Resetting posotion since %s != %s", base_file_name, file.current_file_name_); |
| file.currentTailFilePosition_ = 0; |
| } |
| |
| file.current_file_name_ = item.fileName; |
| |
| storeState(context); |
| } |
| } |
| |
| void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { |
| std::lock_guard<std::mutex> tail_lock(tail_file_mutex_); |
| std::string st_file; |
| if (context->getProperty(StateFile.getName(), st_file)) { |
| state_file_ = st_file + "." + getUUIDStr(); |
| } |
| if (!this->state_recovered_) { |
| state_recovered_ = true; |
| // recover the state if we have not done so |
| this->recoverState(context); |
| } |
| |
| /** |
| * iterate over file states. may modify them |
| */ |
| for (auto &state : tail_states_) { |
| auto fileLocation = state.second.path_; |
| |
| checkRollOver(context, state.second, state.first); |
| std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_; |
| struct stat statbuf; |
| |
| logger_->log_debug("Tailing file %s from %llu", fullPath, state.second.currentTailFilePosition_); |
| if (stat(fullPath.c_str(), &statbuf) == 0) { |
| if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) { |
| logger_->log_trace("Current pos: %llu", state.second.currentTailFilePosition_); |
| logger_->log_trace("%s", "there are no new input for the current tail file"); |
| context->yield(); |
| return; |
| } |
| std::size_t found = state.first.find_last_of("."); |
| std::string baseName = state.first.substr(0, found); |
| std::string extension = state.first.substr(found + 1); |
| |
| if (!delimiter_.empty()) { |
| char delim = delimiter_.c_str()[0]; |
| if (delim == '\\') { |
| if (delimiter_.size() > 1) { |
| switch (delimiter_.c_str()[1]) { |
| case 'r': |
| delim = '\r'; |
| break; |
| case 't': |
| delim = '\t'; |
| break; |
| case 'n': |
| delim = '\n'; |
| break; |
| case '\\': |
| delim = '\\'; |
| break; |
| default: |
| // previous behavior |
| break; |
| } |
| } |
| } |
| logger_->log_debug("Looking for delimiter 0x%X", delim); |
| std::vector<std::shared_ptr<FlowFileRecord>> flowFiles; |
| session->import(fullPath, flowFiles, state.second.currentTailFilePosition_, delim); |
| logger_->log_info("%u flowfiles were received from TailFile input", flowFiles.size()); |
| |
| for (auto ffr : flowFiles) { |
| logger_->log_info("TailFile %s for %u bytes", state.first, ffr->getSize()); |
| std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + extension; |
| ffr->updateKeyedAttribute(PATH, fileLocation); |
| ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath); |
| ffr->updateKeyedAttribute(FILENAME, logName); |
| session->transfer(ffr, Success); |
| state.second.currentTailFilePosition_ += ffr->getSize() + 1; |
| storeState(context); |
| } |
| |
| } else { |
| std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); |
| if (flowFile) { |
| flowFile->updateKeyedAttribute(PATH, fileLocation); |
| flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); |
| session->import(fullPath, flowFile, true, state.second.currentTailFilePosition_); |
| session->transfer(flowFile, Success); |
| logger_->log_info("TailFile %s for %llu bytes", state.first, flowFile->getSize()); |
| std::string logName = baseName + "." + std::to_string(state.second.currentTailFilePosition_) + "-" + std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + "." |
| + extension; |
| flowFile->updateKeyedAttribute(FILENAME, logName); |
| state.second.currentTailFilePosition_ += flowFile->getSize(); |
| storeState(context); |
| } |
| } |
| state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000); |
| } else { |
| logger_->log_warn("Unable to stat file %s", fullPath); |
| } |
| } |
| } |
| |
| } /* namespace processors */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |
| |
| #if defined(__clang__) |
| #pragma clang diagnostic pop |
| #elif defined(__GNUC__) || defined(__GNUG__) |
| #pragma GCC diagnostic pop |
| #endif |