/**
 * @file TailFile.cpp
 * TailFile class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <stdio.h>
#include <dirent.h>
#include <limits.h>
#include <unistd.h>
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <memory>
#include <algorithm>
#include <sstream>
#include <string>
#include <iostream>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "processors/TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wsign-compare"
#elif defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#endif

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
                                   " what data has been ingested so that upon restart NiFi can resume from where it left off",
                                   "TailFileState");
core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
                                   "from the incoming file.",
                                   "");
core::Relationship TailFile::Success("success", "All files are routed to success");

void TailFile::initialize() {
  // Set the supported properties
  std::set<core::Property> properties;
  properties.insert(FileName);
  properties.insert(StateFile);
  properties.insert(Delimiter);
  setSupportedProperties(properties);
  // Set the supported relationships
  std::set<core::Relationship> relationships;
  relationships.insert(Success);
  setSupportedRelationships(relationships);
}

void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
  std::string value;

  if (context->getProperty(Delimiter.getName(), value)) {
    _delimiter = value;
  }
}

std::string TailFile::trimLeft(const std::string& s) {
  return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s);
}

std::string TailFile::trimRight(const std::string& s) {
  return org::apache::nifi::minifi::utils::StringUtils::trimRight(s);
}

void TailFile::parseStateFileLine(char *buf) {
  char *line = buf;

  while ((line[0] == ' ') || (line[0] == '\t'))
    ++line;

  char first = line[0];
  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
    return;
  }

  char *equal = strchr(line, '=');
  if (equal == NULL) {
    return;
  }

  equal[0] = '\0';
  std::string key = line;

  equal++;
  while ((equal[0] == ' ') || (equal[0] == '\t'))
    ++equal;

  first = equal[0];
  if ((first == '\0') || (first == '\r') || (first == '\n')) {
    return;
  }

  std::string value = equal;
  key = trimRight(key);
  value = trimRight(value);

  if (key == "FILENAME")
    this->_currentTailFileName = value;
  if (key == "POSITION")
    this->_currentTailFilePosition = std::stoi(value);

  return;
}

void TailFile::recoverState() {
  std::ifstream file(_stateFile.c_str(), std::ifstream::in);
  if (!file.good()) {
    logger_->log_error("load state file failed %s", _stateFile);
    return;
  }
  char buf[BUFFER_SIZE];
  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
    parseStateFileLine(buf);
  }
}

void TailFile::storeState() {
  std::ofstream file(_stateFile.c_str());
  if (!file.is_open()) {
    logger_->log_error("store state file failed %s", _stateFile);
    return;
  }
  file << "FILENAME=" << this->_currentTailFileName << "\n";
  file << "POSITION=" << this->_currentTailFilePosition << "\n";
  file.close();
}

static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
  return (i.modifiedTime < j.modifiedTime);
}
void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) {
  struct stat statbuf;
  std::vector<TailMatchedFileItem> matchedFiles;
  std::string fullPath = fileLocation + "/" + _currentTailFileName;

  if (stat(fullPath.c_str(), &statbuf) == 0) {
    if (statbuf.st_size > this->_currentTailFilePosition)
      // there are new input for the current tail file
      return;

    uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000);
    std::string pattern = fileName;
    std::size_t found = fileName.find_last_of(".");
    if (found != std::string::npos)
      pattern = fileName.substr(0, found);
    DIR *d;
    d = opendir(fileLocation.c_str());
    if (!d)
      return;
    while (1) {
      struct dirent *entry;
      entry = readdir(d);
      if (!entry)
        break;
      std::string d_name = entry->d_name;
      if (!(entry->d_type & DT_DIR)) {
        std::string fileName = d_name;
        std::string fileFullName = fileLocation + "/" + d_name;
        if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) {
          if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) {
            TailMatchedFileItem item;
            item.fileName = fileName;
            item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
            matchedFiles.push_back(item);
          }
        }
      }
    }
    closedir(d);

    // Sort the list based on modified time
    std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
    for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) {
      TailMatchedFileItem item = *it;
      if (item.fileName == _currentTailFileName) {
        ++it;
        if (it != matchedFiles.end()) {
          TailMatchedFileItem nextItem = *it;
          logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, nextItem.fileName);
          _currentTailFileName = nextItem.fileName;
          _currentTailFilePosition = 0;
          storeState();
        }
        break;
      }
    }
  } else {
    return;
  }
}

void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
  std::string value;
  std::string fileLocation = "";
  std::string fileName = "";
  if (context->getProperty(FileName.getName(), value)) {
    std::size_t found = value.find_last_of("/\\");
    fileLocation = value.substr(0, found);
    fileName = value.substr(found + 1);
  }
  if (context->getProperty(StateFile.getName(), value)) {
    _stateFile = value + "." + getUUIDStr();
  }
  if (!this->_stateRecovered) {
    _stateRecovered = true;
    this->_currentTailFileName = fileName;
    this->_currentTailFilePosition = 0;
    // recover the state if we have not done so
    this->recoverState();
  }
  checkRollOver(fileLocation, fileName);
  std::string fullPath = fileLocation + "/" + _currentTailFileName;
  struct stat statbuf;

  if (stat(fullPath.c_str(), &statbuf) == 0) {
    if ((uint64_t)statbuf.st_size <= this->_currentTailFilePosition) {
      // there are no new input for the current tail file
      context->yield();
      return;
    }
    std::size_t found = _currentTailFileName.find_last_of(".");
    std::string baseName = _currentTailFileName.substr(0, found);
    std::string extension = _currentTailFileName.substr(found + 1);

    if (!this->_delimiter.empty()) {
      char delim = this->_delimiter.c_str()[0];
      std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
      session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
      logger_->log_info("%ll flowfiles were received from TailFile input", flowFiles.size());

      for (auto ffr : flowFiles) {
        logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, ffr->getSize());
        std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
        ffr->updateKeyedAttribute(PATH, fileLocation);
        ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
        ffr->updateKeyedAttribute(FILENAME, logName);
        session->transfer(ffr, Success);
        this->_currentTailFilePosition += ffr->getSize() + 1;
        storeState();
      }

    } else {
      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
      if (!flowFile)
        return;
      flowFile->updateKeyedAttribute(PATH, fileLocation);
      flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
      session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
      session->transfer(flowFile, Success);
      logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize());
      std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
      flowFile->updateKeyedAttribute(FILENAME, logName);
      this->_currentTailFilePosition += flowFile->getSize();
      storeState();
    }

  } else {
    logger_->log_warn("Unable to stat file %s", fullPath);
  }
}

} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

#if defined(__clang__)
#pragma clang diagnostic pop
#elif defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic pop
#endif
