/**
 * @file TailFile.cpp
 * TailFile class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <algorithm>
#include <cinttypes>
#include <cstdint>
#include <iostream>
#include <limits>
#include <map>
#include <unordered_map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "io/CRCStream.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/RegexUtils.h"
#include "TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

core::Property TailFile::FileName(
    core::PropertyBuilder::createProperty("File to Tail")
        ->withDescription("Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode")
        ->isRequired(true)
        ->build());

core::Property TailFile::StateFile(
    core::PropertyBuilder::createProperty("State File")
        ->withDescription("DEPRECATED. Only use it for state migration from the legacy state file.")
        ->isRequired(false)
        ->withDefaultValue<std::string>("TailFileState")
        ->build());

core::Property TailFile::Delimiter(
    core::PropertyBuilder::createProperty("Input Delimiter")
        ->withDescription("Specifies the character that should be used for delimiting the data being tailed"
         "from the incoming file. If none is specified, data will be ingested as it becomes available.")
        ->isRequired(false)
        ->withDefaultValue<std::string>("\\n")
        ->build());

core::Property TailFile::TailMode(
    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
        ->withDescription("Specifies the tail file mode. In 'Single file' mode only a single file will be watched. "
        "In 'Multiple file' mode a regex may be used. Note that in multiple file mode we will still continue to watch for rollover on the initial set of watched files. "
        "The Regex used to locate multiple files will be run during the schedule phrase. Note that if rotated files are matched by the regex, those files will be tailed.")->isRequired(true)
        ->withAllowableValue<std::string>("Single file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
        ->build());

core::Property TailFile::BaseDirectory(
    core::PropertyBuilder::createProperty("tail-base-directory", "Base Directory")
        ->withDescription("Base directory used to look for files to tail. This property is required when using Multiple file mode.")
        ->isRequired(false)
        ->build());

core::Property TailFile::RecursiveLookup(
    core::PropertyBuilder::createProperty("Recursive lookup")
        ->withDescription("When using Multiple file mode, this property determines whether files are tailed in "
        "child directories of the Base Directory or not.")
        ->isRequired(false)
        ->withDefaultValue<bool>(false)
        ->build());

core::Property TailFile::LookupFrequency(
    core::PropertyBuilder::createProperty("Lookup frequency")
        ->withDescription("When using Multiple file mode, this property specifies the minimum duration "
        "the processor will wait between looking for new files to tail in the Base Directory.")
        ->isRequired(false)
        ->withDefaultValue<core::TimePeriodValue>("10 min")
        ->build());

core::Property TailFile::RollingFilenamePattern(
    core::PropertyBuilder::createProperty("Rolling Filename Pattern")
        ->withDescription("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to "
        "identify files that have rolled over so MiNiFi can read the remaining of the rolled-over file and then continue with the new log file. "
        "This pattern supports the wildcard characters * and ?, it also supports the notation ${filename} to specify a pattern based on the name of the file "
        "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed.")
        ->isRequired(false)
        ->withDefaultValue<std::string>("${filename}.*")
        ->build());

core::Relationship TailFile::Success("success", "All files are routed to success");

const char *TailFile::CURRENT_STR = "CURRENT.";
const char *TailFile::POSITION_STR = "POSITION.";

namespace {
template<typename Container, typename Key>
bool containsKey(const Container &container, const Key &key) {
  return container.find(key) != container.end();
}

template <typename Container, typename Key>
int64_t readOptionalInt64(const Container &container, const Key &key) {
  const auto it = container.find(key);
  if (it != container.end()) {
    return std::stoll(it->second);
  } else {
    return 0;
  }
}

template <typename Container, typename Key>
uint64_t readOptionalUint64(const Container &container, const Key &key) {
  const auto it = container.find(key);
  if (it != container.end()) {
    return std::stoull(it->second);
  } else {
    return 0;
  }
}

// the delimiter is the first character of the input, allowing some escape sequences
std::string parseDelimiter(const std::string &input) {
  if (input.empty()) return "";
  if (input[0] != '\\') return std::string{ input[0] };
  if (input.size() == std::size_t{1}) return "\\";
  switch (input[1]) {
    case 'r': return "\r";
    case 't': return "\t";
    case 'n': return "\n";
    default: return std::string{ input[1] };
  }
}

std::map<std::string, TailState> update_keys_in_legacy_states(const std::map<std::string, TailState> &legacy_tail_states) {
  std::map<std::string, TailState> new_tail_states;
  for (const auto &key_value_pair : legacy_tail_states) {
    const TailState &state = key_value_pair.second;
    std::string full_file_name = utils::file::FileUtils::concat_path(state.path_, state.file_name_);
    new_tail_states.emplace(full_file_name, state);
  }
  return new_tail_states;
}

struct TailStateWithMtime {
  using TimePoint = std::chrono::time_point<std::chrono::system_clock, std::chrono::seconds>;

  TailStateWithMtime(TailState tail_state, TimePoint mtime)
    : tail_state_(std::move(tail_state)), mtime_(mtime) {}

  TailState tail_state_;
  TimePoint mtime_;
};

void openFile(const std::string &file_name, uint64_t offset, std::ifstream &input_stream, const std::shared_ptr<logging::Logger> &logger) {
  logger->log_debug("Opening %s", file_name);
  input_stream.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
  if (!input_stream.is_open() || !input_stream.good()) {
    input_stream.close();
    throw Exception(FILE_OPERATION_EXCEPTION, "Could not open file: " + file_name);
  }
  if (offset != 0U) {
    input_stream.seekg(offset, std::ifstream::beg);
    if (!input_stream.good()) {
      logger->log_error("Seeking to %" PRIu64 " failed for file %s (does file/filesystem support seeking?)", offset, file_name);
      throw Exception(FILE_OPERATION_EXCEPTION, "Could not seek file " + file_name + " to offset " + std::to_string(offset));
    }
  }
}

constexpr std::size_t BUFFER_SIZE = 4096;

class FileReaderCallback : public OutputStreamCallback {
 public:
  FileReaderCallback(const std::string &file_name,
                     uint64_t offset,
                     char input_delimiter,
                     uint64_t checksum)
    : input_delimiter_(input_delimiter),
      checksum_(checksum),
      logger_(logging::LoggerFactory<TailFile>::getLogger()) {
    openFile(file_name, offset, input_stream_, logger_);
  }

  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
    io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};

    uint64_t num_bytes_written = 0;
    bool found_delimiter = false;

    while (hasMoreToRead() && !found_delimiter) {
      if (begin_ == end_) {
        input_stream_.read(reinterpret_cast<char *>(buffer_.data()), buffer_.size());

        const auto num_bytes_read = input_stream_.gcount();
        logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});

        begin_ = buffer_.data();
        end_ = begin_ + num_bytes_read;
      }

      char *delimiter_pos = std::find(begin_, end_, input_delimiter_);
      found_delimiter = (delimiter_pos != end_);

      ptrdiff_t zlen{std::distance(begin_, delimiter_pos)};
      if (found_delimiter) {
        zlen += 1;
      }
      const int len = gsl::narrow<int>(zlen);

      crc_stream.write(reinterpret_cast<uint8_t*>(begin_), len);
      num_bytes_written += len;
      begin_ += len;
    }

    if (found_delimiter) {
      checksum_ = crc_stream.getCRC();
    } else {
      latest_flow_file_ends_with_delimiter_ = false;
    }

    return num_bytes_written;
  }

  uint64_t checksum() const {
    return checksum_;
  }

  bool hasMoreToRead() const {
    return begin_ != end_ || input_stream_.good();
  }

  bool useLatestFlowFile() const {
    return latest_flow_file_ends_with_delimiter_;
  }

 private:
  char input_delimiter_;
  uint64_t checksum_;
  std::ifstream input_stream_;
  std::shared_ptr<logging::Logger> logger_;

  std::array<char, BUFFER_SIZE> buffer_;
  char *begin_ = buffer_.data();
  char *end_ = buffer_.data();

  bool latest_flow_file_ends_with_delimiter_ = true;
};

class WholeFileReaderCallback : public OutputStreamCallback {
 public:
  WholeFileReaderCallback(const std::string &file_name,
                          uint64_t offset,
                          uint64_t checksum)
    : checksum_(checksum),
      logger_(logging::LoggerFactory<TailFile>::getLogger()) {
    openFile(file_name, offset, input_stream_, logger_);
  }

  uint64_t checksum() const {
    return checksum_;
  }

  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
    std::array<char, BUFFER_SIZE> buffer;

    io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};

    uint64_t num_bytes_written = 0;

    while (input_stream_.good()) {
      input_stream_.read(buffer.data(), buffer.size());

      const auto num_bytes_read = input_stream_.gcount();
      logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});

      const int len = gsl::narrow<int>(num_bytes_read);

      crc_stream.write(reinterpret_cast<uint8_t*>(buffer.data()), len);
      num_bytes_written += len;
    }

    checksum_ = crc_stream.getCRC();

    return num_bytes_written;
  }

 private:
  uint64_t checksum_;
  std::ifstream input_stream_;
  std::shared_ptr<logging::Logger> logger_;
};
}  // namespace

void TailFile::initialize() {
  // Set the supported properties
  std::set<core::Property> properties;
  properties.insert(FileName);
  properties.insert(StateFile);
  properties.insert(Delimiter);
  properties.insert(TailMode);
  properties.insert(BaseDirectory);
  properties.insert(RecursiveLookup);
  properties.insert(LookupFrequency);
  properties.insert(RollingFilenamePattern);
  setSupportedProperties(properties);
  // Set the supported relationships
  std::set<core::Relationship> relationships;
  relationships.insert(Success);
  setSupportedRelationships(relationships);
}

void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);

  tail_states_.clear();

  state_manager_ = context->getStateManager();
  if (state_manager_ == nullptr) {
    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
  }

  std::string value;

  if (context->getProperty(Delimiter.getName(), value)) {
    delimiter_ = parseDelimiter(value);
  }

  context->getProperty(FileName.getName(), file_to_tail_);

  std::string mode;
  context->getProperty(TailMode.getName(), mode);

  if (mode == "Multiple file") {
    tail_mode_ = Mode::MULTIPLE;

    if (!context->getProperty(BaseDirectory.getName(), base_dir_)) {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode.");
    }

    if (utils::file::FileUtils::is_directory(base_dir_.c_str()) == 0) {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory does not exist or is not a directory");
    }

    context->getProperty(RecursiveLookup.getName(), recursive_lookup_);

    // NOTE:
    //   context->getProperty(LookupFrequency.getName(), lookup_frequency_);
    // is incorrect, as std::chrono::milliseconds::rep is unspecified, and may not be supported by getProperty()
    // (e.g. in clang/libc++, this underlying type is long long)
    int64_t lookup_frequency;
    if (context->getProperty(LookupFrequency.getName(), lookup_frequency)) {
      lookup_frequency_ = std::chrono::milliseconds{lookup_frequency};
    }

    recoverState(context);

    doMultifileLookup();

  } else {
    tail_mode_ = Mode::SINGLE;

    std::string path, file_name;
    if (utils::file::getFileNameAndPath(file_to_tail_, path, file_name)) {
      // NOTE: position and checksum will be updated in recoverState() if there is a persisted state for this file
      tail_states_.emplace(file_to_tail_, TailState{path, file_name});
    } else {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
    }

    recoverState(context);
  }

  std::string rolling_filename_pattern_glob;
  context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
  rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
}

void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
  char *line = buf;

  logger_->log_trace("Received line %s", buf);

  while ((line[0] == ' ') || (line[0] == '\t'))
    ++line;

  char first = line[0];
  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
    return;
  }

  char *equal = strchr(line, '=');
  if (equal == nullptr) {
    return;
  }

  equal[0] = '\0';
  std::string key = line;

  equal++;
  while ((equal[0] == ' ') || (equal[0] == '\t'))
    ++equal;

  first = equal[0];
  if ((first == '\0') || (first == '\r') || (first == '\n')) {
    return;
  }

  std::string value = equal;
  key = utils::StringUtils::trimRight(key);
  value = utils::StringUtils::trimRight(value);

  if (key == "FILENAME") {
    std::string fileLocation, fileName;
    if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
      logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
      state.emplace(fileName, TailState{fileLocation, fileName});
    } else {
      state.emplace(value, TailState{fileLocation, value});
    }
  }
  if (key == "POSITION") {
    // for backwards compatibility
    if (tail_states_.size() != std::size_t{1}) {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
    }
    const auto position = std::stoull(value);
    logger_->log_debug("Received position %llu", position);
    state.begin()->second.position_ = gsl::narrow<uint64_t>(position);
  }
  if (key.find(CURRENT_STR) == 0) {
    const auto file = key.substr(strlen(CURRENT_STR));
    std::string fileLocation, fileName;
    if (utils::file::getFileNameAndPath(value, fileLocation, fileName)) {
      state[file].path_ = fileLocation;
      state[file].file_name_ = fileName;
    } else {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
    }
  }

  if (key.find(POSITION_STR) == 0) {
    const auto file = key.substr(strlen(POSITION_STR));
    state[file].position_ = std::stoull(value);
  }
}

bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
  std::map<std::string, TailState> new_tail_states;
  bool state_load_success = getStateFromStateManager(new_tail_states) ||
                            getStateFromLegacyStateFile(context, new_tail_states);
  if (!state_load_success) {
    return false;
  }

  if (tail_mode_ == Mode::SINGLE) {
    if (tail_states_.size() == 1) {
      auto state_it = tail_states_.begin();
      const auto it = new_tail_states.find(state_it->first);
      if (it != new_tail_states.end()) {
        state_it->second = it->second;
      }
    } else {
      throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "This should never happen: "
          "in Single file mode, internal state size should be 1, but it is " + std::to_string(tail_states_.size()));
    }
  } else {
    tail_states_ = std::move(new_tail_states);
  }

  logState();
  storeState();

  return true;
}

bool TailFile::getStateFromStateManager(std::map<std::string, TailState> &new_tail_states) const {
  std::unordered_map<std::string, std::string> state_map;
  if (state_manager_->get(state_map)) {
    for (size_t i = 0U;; i++) {
      std::string name;
      try {
        name = state_map.at("file." + std::to_string(i) + ".name");
      } catch (...) {
        break;
      }
      try {
        const std::string& current = state_map.at("file." + std::to_string(i) + ".current");
        uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position"));
        uint64_t checksum = readOptionalUint64(state_map, "file." + std::to_string(i) + ".checksum");
        std::chrono::system_clock::time_point last_read_time{std::chrono::milliseconds{
            readOptionalInt64(state_map, "file." + std::to_string(i) + ".last_read_time")
        }};

        std::string fileLocation, fileName;
        if (utils::file::getFileNameAndPath(current, fileLocation, fileName)) {
          logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
          new_tail_states.emplace(current, TailState{fileLocation, fileName, position, last_read_time, checksum});
        } else {
          new_tail_states.emplace(current, TailState{fileLocation, current, position, last_read_time, checksum});
        }
      } catch (...) {
        continue;
      }
    }
    for (const auto& s : tail_states_) {
      logger_->log_debug("TailState %s: %s, %s, %" PRIu64 ", %" PRIu64,
                         s.first, s.second.path_, s.second.file_name_, s.second.position_, s.second.checksum_);
    }
    return true;
  } else {
    logger_->log_info("Found no stored state");
  }
  return false;
}

bool TailFile::getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
                                           std::map<std::string, TailState> &new_tail_states) const {
  std::string state_file_name_property;
  context->getProperty(StateFile.getName(), state_file_name_property);
  std::string state_file = state_file_name_property + "." + getUUIDStr();

  std::ifstream file(state_file.c_str(), std::ifstream::in);
  if (!file.good()) {
    logger_->log_info("Legacy state file %s not found (this is OK)", state_file);
    return false;
  }

  std::map<std::string, TailState> legacy_tail_states;
  char buf[BUFFER_SIZE];
  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
    parseStateFileLine(buf, legacy_tail_states);
  }

  new_tail_states = update_keys_in_legacy_states(legacy_tail_states);
  return true;
}

void TailFile::logState() {
  logger_->log_info("State of the TailFile processor %s:", name_);
  for (const auto& key_value_pair : tail_states_) {
    logging::LOG_INFO(logger_) << key_value_pair.first << " => { " << key_value_pair.second << " }";
  }
}

std::ostream& operator<<(std::ostream &os, const TailState &tail_state) {
  os << "name: " << tail_state.file_name_
      << ", position: " << tail_state.position_
      << ", checksum: " << tail_state.checksum_
      << ", last_read_time: " << tail_state.lastReadTimeInMilliseconds();
  return os;
}

bool TailFile::storeState() {
  std::unordered_map<std::string, std::string> state;
  size_t i = 0;
  for (const auto& tail_state : tail_states_) {
    state["file." + std::to_string(i) + ".current"] = tail_state.first;
    state["file." + std::to_string(i) + ".name"] = tail_state.second.file_name_;
    state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.position_);
    state["file." + std::to_string(i) + ".checksum"] = std::to_string(tail_state.second.checksum_);
    state["file." + std::to_string(i) + ".last_read_time"] = std::to_string(tail_state.second.lastReadTimeInMilliseconds());
    ++i;
  }
  if (!state_manager_->set(state)) {
    logger_->log_error("Failed to set state");
    return false;
  }
  return true;
}

std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) const {
  logger_->log_debug("Searching for files rolled over; last read time is %" PRId64, state.lastReadTimeInMilliseconds());

  std::size_t last_dot_position = state.file_name_.find_last_of('.');
  std::string base_name = state.file_name_.substr(0, last_dot_position);
  std::string pattern = utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", base_name);

  std::vector<TailStateWithMtime> matched_files_with_mtime;
  auto collect_matching_files = [&](const std::string &path, const std::string &file_name) -> bool {
    if (file_name != state.file_name_ && utils::Regex::matchesFullInput(pattern, file_name)) {
      std::string full_file_name = path + utils::file::FileUtils::get_separator() + file_name;
      TailStateWithMtime::TimePoint mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
      logger_->log_debug("File %s with mtime %" PRId64 " matches rolling filename pattern %s", file_name, int64_t{mtime.time_since_epoch().count()}, pattern);
      if (mtime >= std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
        logger_->log_debug("File %s has mtime >= last read time, so we are going to read it", file_name);
        matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime);
      }
    }
    return true;
  };

  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, logger_, false);

  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), [](const TailStateWithMtime &left, const TailStateWithMtime &right) {
    return std::tie(left.mtime_, left.tail_state_.file_name_) <
           std::tie(right.mtime_, right.tail_state_.file_name_);
  });

  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
    TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
    std::string full_file_name = first_rotated_file.fileNameWithPath();
    if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
      uint64_t checksum = utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
      if (checksum == state.checksum_) {
        first_rotated_file.position_ = state.position_;
        first_rotated_file.checksum_ = state.checksum_;
      }
    }
  }

  std::vector<TailState> matched_files;
  matched_files.reserve(matched_files_with_mtime.size());
  std::transform(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), std::back_inserter(matched_files),
                 [](TailStateWithMtime &tail_state_with_mtime) { return std::move(tail_state_with_mtime.tail_state_); });
  return matched_files;
}

void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession> &session) {
  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);

  if (tail_mode_ == Mode::MULTIPLE) {
    if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) {
      logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()});
      doMultifileLookup();
    } else {
      logger_->log_trace("Skipping multifile lookup");
    }
  }

  // iterate over file states. may modify them
  for (auto &state : tail_states_) {
    processFile(session, state.first, state.second);
  }

  if (!session->existsFlowFileInRelationship(Success)) {
    yield();
  }
}

void TailFile::processFile(const std::shared_ptr<core::ProcessSession> &session,
                           const std::string &full_file_name,
                           TailState &state) {
  uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
  if (fsize < state.position_) {
    processRotatedFiles(session, state);
  } else if (fsize == state.position_) {
    logger_->log_trace("Skipping file %s as its size hasn't change since last read", state.file_name_);
    return;
  }

  processSingleFile(session, full_file_name, state);
}

void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state) {
    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
    for (TailState &file_state : rotated_file_states) {
      processSingleFile(session, file_state.fileNameWithPath(), file_state);
    }
    state.position_ = 0;
    state.checksum_ = 0;
}

void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &session,
                                 const std::string &full_file_name,
                                 TailState &state) {
  std::string fileName = state.file_name_;

  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
    logger_->log_warn("Unable to read file %s as it does not exist or has size zero", full_file_name);
    return;
  }
  logger_->log_debug("Tailing file %s from %" PRIu64, full_file_name, state.position_);

  std::size_t last_dot_position = fileName.find_last_of('.');
  std::string baseName = fileName.substr(0, last_dot_position);
  std::string extension = fileName.substr(last_dot_position + 1);

  if (!delimiter_.empty()) {
    char delim = delimiter_[0];
    logger_->log_trace("Looking for delimiter 0x%X", delim);

    std::size_t num_flow_files = 0;
    FileReaderCallback file_reader{full_file_name, state.position_, delim, state.checksum_};
    TailState state_copy{state};

    while (file_reader.hasMoreToRead()) {
      auto flow_file = session->create();
      session->write(flow_file, &file_reader);

      if (file_reader.useLatestFlowFile()) {
        updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
        session->transfer(flow_file, Success);
        updateStateAttributes(state_copy, flow_file->getSize(), file_reader.checksum());

        ++num_flow_files;

      } else {
        session->remove(flow_file);
      }
    }

    state = state_copy;
    storeState();

    logger_->log_info("%zu flowfiles were received from TailFile input", num_flow_files);

  } else {
    WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_};
    auto flow_file = session->create();
    session->write(flow_file, &file_reader);

    updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
    session->transfer(flow_file, Success);
    updateStateAttributes(state, flow_file->getSize(), file_reader.checksum());

    storeState();
  }
}

void TailFile::updateFlowFileAttributes(const std::string &full_file_name, const TailState &state,
                                        const std::string &fileName, const std::string &baseName,
                                        const std::string &extension,
                                        std::shared_ptr<core::FlowFile> &flow_file) const {
  logger_->log_info("TailFile %s for %" PRIu64 " bytes", fileName, flow_file->getSize());
  std::string logName = baseName + "." + std::to_string(state.position_) + "-" +
                        std::to_string(state.position_ + flow_file->getSize() - 1) + "." + extension;
  flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_);
  flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name);
  flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
}

void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) const {
  state.position_ += size;
  state.last_read_time_ = std::chrono::system_clock::now();
  state.checksum_ = checksum;
}

void TailFile::doMultifileLookup() {
  checkForRemovedFiles();
  checkForNewFiles();
  last_multifile_lookup_ = std::chrono::steady_clock::now();
}

void TailFile::checkForRemovedFiles() {
  std::vector<std::string> file_names_to_remove;

  for (const auto &kv : tail_states_) {
    const std::string &full_file_name = kv.first;
    const TailState &state = kv.second;
    if (utils::file::FileUtils::file_size(state.fileNameWithPath()) == 0u ||
        !utils::Regex::matchesFullInput(file_to_tail_, state.file_name_)) {
      file_names_to_remove.push_back(full_file_name);
    }
  }

  for (const auto &full_file_name : file_names_to_remove) {
    tail_states_.erase(full_file_name);
  }
}

void TailFile::checkForNewFiles() {
  auto add_new_files_callback = [&](const std::string &path, const std::string &file_name) -> bool {
    std::string full_file_name = path + utils::file::FileUtils::get_separator() + file_name;
    if (!containsKey(tail_states_, full_file_name) && utils::Regex::matchesFullInput(file_to_tail_, file_name)) {
      tail_states_.emplace(full_file_name, TailState{path, file_name});
    }
    return true;
  };

  utils::file::FileUtils::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_);
}

std::chrono::milliseconds TailFile::getLookupFrequency() const {
  return lookup_frequency_;
}

}  // namespace processors
}  // namespace minifi
}  // namespace nifi
}  // namespace apache
}  // namespace org
