blob: 61aa86b0def950bedfc39cdda4e5de0d537c70a5 [file] [log] [blame]
/**
* @file TailFile.cpp
* TailFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <stdio.h>
#include <dirent.h>
#include <limits.h>
#include <unistd.h>
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <memory>
#include <algorithm>
#include <sstream>
#include <string>
#include <iostream>
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "processors/TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wsign-compare"
#elif defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#endif
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", "");
core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
" what data has been ingested so that upon restart NiFi can resume from where it left off",
"TailFileState");
core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
"from the incoming file.",
"");
core::Relationship TailFile::Success("success", "All files are routed to success");
void TailFile::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(FileName);
properties.insert(StateFile);
properties.insert(Delimiter);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void TailFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
std::string value;
if (context->getProperty(Delimiter.getName(), value)) {
_delimiter = value;
}
}
std::string TailFile::trimLeft(const std::string& s) {
return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s);
}
std::string TailFile::trimRight(const std::string& s) {
return org::apache::nifi::minifi::utils::StringUtils::trimRight(s);
}
void TailFile::parseStateFileLine(char *buf) {
char *line = buf;
while ((line[0] == ' ') || (line[0] == '\t'))
++line;
char first = line[0];
if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
return;
}
char *equal = strchr(line, '=');
if (equal == NULL) {
return;
}
equal[0] = '\0';
std::string key = line;
equal++;
while ((equal[0] == ' ') || (equal[0] == '\t'))
++equal;
first = equal[0];
if ((first == '\0') || (first == '\r') || (first == '\n')) {
return;
}
std::string value = equal;
key = trimRight(key);
value = trimRight(value);
if (key == "FILENAME")
this->_currentTailFileName = value;
if (key == "POSITION")
this->_currentTailFilePosition = std::stoi(value);
return;
}
void TailFile::recoverState() {
std::ifstream file(_stateFile.c_str(), std::ifstream::in);
if (!file.good()) {
logger_->log_error("load state file failed %s", _stateFile);
return;
}
char buf[BUFFER_SIZE];
for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
parseStateFileLine(buf);
}
}
void TailFile::storeState() {
std::ofstream file(_stateFile.c_str());
if (!file.is_open()) {
logger_->log_error("store state file failed %s", _stateFile);
return;
}
file << "FILENAME=" << this->_currentTailFileName << "\n";
file << "POSITION=" << this->_currentTailFilePosition << "\n";
file.close();
}
static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
return (i.modifiedTime < j.modifiedTime);
}
void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) {
struct stat statbuf;
std::vector<TailMatchedFileItem> matchedFiles;
std::string fullPath = fileLocation + "/" + _currentTailFileName;
if (stat(fullPath.c_str(), &statbuf) == 0) {
if (statbuf.st_size > this->_currentTailFilePosition)
// there are new input for the current tail file
return;
uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000);
std::string pattern = fileName;
std::size_t found = fileName.find_last_of(".");
if (found != std::string::npos)
pattern = fileName.substr(0, found);
DIR *d;
d = opendir(fileLocation.c_str());
if (!d)
return;
while (1) {
struct dirent *entry;
entry = readdir(d);
if (!entry)
break;
std::string d_name = entry->d_name;
if (!(entry->d_type & DT_DIR)) {
std::string fileName = d_name;
std::string fileFullName = fileLocation + "/" + d_name;
if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) {
if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) {
TailMatchedFileItem item;
item.fileName = fileName;
item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
matchedFiles.push_back(item);
}
}
}
}
closedir(d);
// Sort the list based on modified time
std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem);
for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it != matchedFiles.end(); ++it) {
TailMatchedFileItem item = *it;
if (item.fileName == _currentTailFileName) {
++it;
if (it != matchedFiles.end()) {
TailMatchedFileItem nextItem = *it;
logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName, nextItem.fileName);
_currentTailFileName = nextItem.fileName;
_currentTailFilePosition = 0;
storeState();
}
break;
}
}
} else {
return;
}
}
void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
std::string value;
std::string fileLocation = "";
std::string fileName = "";
if (context->getProperty(FileName.getName(), value)) {
std::size_t found = value.find_last_of("/\\");
fileLocation = value.substr(0, found);
fileName = value.substr(found + 1);
}
if (context->getProperty(StateFile.getName(), value)) {
_stateFile = value + "." + getUUIDStr();
}
if (!this->_stateRecovered) {
_stateRecovered = true;
this->_currentTailFileName = fileName;
this->_currentTailFilePosition = 0;
// recover the state if we have not done so
this->recoverState();
}
checkRollOver(fileLocation, fileName);
std::string fullPath = fileLocation + "/" + _currentTailFileName;
struct stat statbuf;
if (stat(fullPath.c_str(), &statbuf) == 0) {
if ((uint64_t)statbuf.st_size <= this->_currentTailFilePosition) {
// there are no new input for the current tail file
context->yield();
return;
}
std::size_t found = _currentTailFileName.find_last_of(".");
std::string baseName = _currentTailFileName.substr(0, found);
std::string extension = _currentTailFileName.substr(found + 1);
if (!this->_delimiter.empty()) {
char delim = this->_delimiter.c_str()[0];
std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim);
logger_->log_info("%ll flowfiles were received from TailFile input", flowFiles.size());
for (auto ffr : flowFiles) {
logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, ffr->getSize());
std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension;
ffr->updateKeyedAttribute(PATH, fileLocation);
ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
ffr->updateKeyedAttribute(FILENAME, logName);
session->transfer(ffr, Success);
this->_currentTailFilePosition += ffr->getSize() + 1;
storeState();
}
} else {
std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
if (!flowFile)
return;
flowFile->updateKeyedAttribute(PATH, fileLocation);
flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
session->transfer(flowFile, Success);
logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize());
std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension;
flowFile->updateKeyedAttribute(FILENAME, logName);
this->_currentTailFilePosition += flowFile->getSize();
storeState();
}
} else {
logger_->log_warn("Unable to stat file %s", fullPath);
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#if defined(__clang__)
#pragma clang diagnostic pop
#elif defined(__GNUC__) || defined(__GNUG__)
#pragma GCC diagnostic pop
#endif