| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "GetFile.h" |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <time.h> |
| #include <stdio.h> |
| #include <limits.h> |
| #ifndef WIN32 |
| #include <regex.h> |
| #else |
| #include <regex> |
| #endif |
| #include <vector> |
| #include <queue> |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <sstream> |
| #include <string> |
| #include <iostream> |
| #include "utils/StringUtils.h" |
| #include "utils/file/FileUtils.h" |
| #include "utils/TimeUtil.h" |
| #include "utils/RegexUtils.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "core/TypedValues.h" |
| |
| #define R_OK 4 /* Test for read permission. */ |
| #define W_OK 2 /* Test for write permission. */ |
| #define F_OK 0 /* Test for existence. */ |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| |
| core::Property GetFile::BatchSize( |
| core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of files to pull in each iteration")->withDefaultValue<uint32_t>(10)->build()); |
| |
| core::Property GetFile::Directory( |
| core::PropertyBuilder::createProperty("Input Directory")->withDescription("The input directory from which to pull files")->isRequired(true)->supportsExpressionLanguage(true) |
| ->build()); |
| |
| core::Property GetFile::IgnoreHiddenFile( |
| core::PropertyBuilder::createProperty("Ignore Hidden Files")->withDescription("Indicates whether or not hidden files should be ignored")->withDefaultValue<bool>(true)->build()); |
| |
| core::Property GetFile::KeepSourceFile( |
| core::PropertyBuilder::createProperty("Keep Source File")->withDescription("If true, the file is not deleted after it has been copied to the Content Repository")->withDefaultValue<bool>(false) |
| ->build()); |
| |
| core::Property GetFile::MaxAge( |
| core::PropertyBuilder::createProperty("Maximum File Age")->withDescription("The maximum age that a file must be in order to be pulled;" |
| " any file older than this amount of time (according to last modification date) will be ignored") |
| ->withDefaultValue<core::TimePeriodValue>("0 sec")->build()); |
| |
| core::Property GetFile::MinAge( |
| core::PropertyBuilder::createProperty("Minimum File Age")->withDescription("The minimum age that a file must be in order to be pulled;" |
| " any file younger than this amount of time (according to last modification date) will be ignored") |
| ->withDefaultValue<core::TimePeriodValue>("0 sec")->build()); |
| |
| core::Property GetFile::MaxSize( |
| core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file can be in order to be pulled")->withDefaultValue<core::DataSizeValue>("0 B")->build()); |
| |
| core::Property GetFile::MinSize( |
| core::PropertyBuilder::createProperty("Minimum File Size")->withDescription("The minimum size that a file can be in order to be pulled")->withDefaultValue<core::DataSizeValue>("0 B")->build()); |
| |
| core::Property GetFile::PollInterval( |
| core::PropertyBuilder::createProperty("Polling Interval")->withDescription("Indicates how long to wait before performing a directory listing")->withDefaultValue<core::TimePeriodValue>("0 sec") |
| ->build()); |
| |
| core::Property GetFile::Recurse( |
| core::PropertyBuilder::createProperty("Recurse Subdirectories")->withDescription("Indicates whether or not to pull files from subdirectories")->withDefaultValue<bool>(true)->build()); |
| |
| core::Property GetFile::FileFilter( |
| core::PropertyBuilder::createProperty("File Filter")->withDescription("Only files whose names match the given regular expression will be picked up")->withDefaultValue("[^\\.].*")->build()); |
| |
| core::Relationship GetFile::Success("success", "All files are routed to success"); |
| |
| void GetFile::initialize() { |
| // Set the supported properties |
| std::set<core::Property> properties; |
| properties.insert(BatchSize); |
| properties.insert(Directory); |
| properties.insert(IgnoreHiddenFile); |
| properties.insert(KeepSourceFile); |
| properties.insert(MaxAge); |
| properties.insert(MinAge); |
| properties.insert(MaxSize); |
| properties.insert(MinSize); |
| properties.insert(PollInterval); |
| properties.insert(Recurse); |
| properties.insert(FileFilter); |
| setSupportedProperties(properties); |
| // Set the supported relationships |
| std::set<core::Relationship> relationships; |
| relationships.insert(Success); |
| setSupportedRelationships(relationships); |
| } |
| |
| void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { |
| std::string value; |
| if (context->getProperty(BatchSize.getName(), value)) { |
| core::Property::StringToInt(value, request_.batchSize); |
| } |
| if (context->getProperty(IgnoreHiddenFile.getName(), value)) { |
| org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.ignoreHiddenFile); |
| } |
| if (context->getProperty(KeepSourceFile.getName(), value)) { |
| org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.keepSourceFile); |
| } |
| |
| context->getProperty(MaxAge.getName(), request_.maxAge); |
| context->getProperty(MinAge.getName(), request_.minAge); |
| |
| if (context->getProperty(MaxSize.getName(), value)) { |
| core::Property::StringToInt(value, request_.maxSize); |
| } |
| if (context->getProperty(MinSize.getName(), value)) { |
| core::Property::StringToInt(value, request_.minSize); |
| } |
| |
| context->getProperty(PollInterval.getName(), request_.pollInterval); |
| |
| if (context->getProperty(Recurse.getName(), value)) { |
| org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, request_.recursive); |
| } |
| |
| if (context->getProperty(FileFilter.getName(), value)) { |
| request_.fileFilter = value; |
| } |
| |
| if (!context->getProperty(Directory.getName(), value)) { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property is missing"); |
| } |
| if (!utils::file::FileUtils::is_directory(value.c_str())) { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value + "\" is not a directory"); |
| } |
| request_.inputDirectory = value; |
| } |
| |
| void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { |
| // Perform directory list |
| |
| metrics_->iterations_++; |
| |
| const bool isDirEmptyBeforePoll = isListingEmpty(); |
| logger_->log_debug("Is listing empty before polling directory %i", isDirEmptyBeforePoll); |
| if (isDirEmptyBeforePoll) { |
| if (request_.pollInterval == 0 || (utils::timeutils::getTimeMillis() - last_listing_time_) > request_.pollInterval) { |
| performListing(request_); |
| last_listing_time_.store(utils::timeutils::getTimeMillis()); |
| } |
| } |
| |
| const bool isDirEmptyAfterPoll = isListingEmpty(); |
| logger_->log_debug("Is listing empty after polling directory %i", isDirEmptyAfterPoll); |
| |
| if (!isDirEmptyAfterPoll) { |
| try { |
| std::queue<std::string> list; |
| pollListing(list, request_); |
| while (!list.empty()) { |
| std::string fileName = list.front(); |
| list.pop(); |
| logger_->log_info("GetFile process %s", fileName); |
| std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); |
| if (flowFile == nullptr) |
| return; |
| std::size_t found = fileName.find_last_of("/\\"); |
| std::string path = fileName.substr(0, found); |
| std::string name = fileName.substr(found + 1); |
| flowFile->updateKeyedAttribute(FILENAME, name); |
| flowFile->updateKeyedAttribute(PATH, path); |
| flowFile->addKeyedAttribute(ABSOLUTE_PATH, fileName); |
| session->import(fileName, flowFile, request_.keepSourceFile); |
| session->transfer(flowFile, Success); |
| } |
| } catch (std::exception &exception) { |
| logger_->log_debug("GetFile Caught Exception %s", exception.what()); |
| throw; |
| } catch (...) { |
| throw; |
| } |
| } |
| } |
| |
| bool GetFile::isListingEmpty() { |
| std::lock_guard<std::mutex> lock(mutex_); |
| |
| return _dirList.empty(); |
| } |
| |
| void GetFile::putListing(std::string fileName) { |
| logger_->log_trace("Adding file to queue: %s", fileName); |
| |
| std::lock_guard<std::mutex> lock(mutex_); |
| |
| _dirList.push(fileName); |
| } |
| |
| void GetFile::pollListing(std::queue<std::string> &list, const GetFileRequest &request) { |
| std::lock_guard<std::mutex> lock(mutex_); |
| |
| while (!_dirList.empty() && (request.batchSize == 0 || list.size() < request.batchSize)) { |
| list.push(_dirList.front()); |
| _dirList.pop(); |
| } |
| } |
| |
| bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRequest &request) { |
| logger_->log_trace("Checking file: %s", fullName); |
| |
| struct stat statbuf; |
| |
| if (stat(fullName.c_str(), &statbuf) == 0) { |
| if (request.minSize > 0 && statbuf.st_size < (int32_t) request.minSize) |
| return false; |
| |
| if (request.maxSize > 0 && statbuf.st_size > (int32_t) request.maxSize) |
| return false; |
| |
| uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); |
| uint64_t fileAge = utils::timeutils::getTimeMillis() - modifiedTime; |
| if (request.minAge > 0 && fileAge < request.minAge) |
| return false; |
| if (request.maxAge > 0 && fileAge > request.maxAge) |
| return false; |
| |
| if (request.ignoreHiddenFile && utils::file::FileUtils::is_hidden(fullName)) |
| return false; |
| |
| if (utils::file::FileUtils::access(fullName.c_str(), R_OK) != 0) |
| return false; |
| |
| if (request.keepSourceFile == false && utils::file::FileUtils::access(fullName.c_str(), W_OK) != 0) |
| return false; |
| |
| utils::Regex rgx(request.fileFilter); |
| if (!rgx.match(name)) { |
| return false; |
| } |
| |
| metrics_->input_bytes_ += statbuf.st_size; |
| metrics_->accepted_files_++; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| void GetFile::performListing(const GetFileRequest &request) { |
| auto callback = [this, request](const std::string& dir, const std::string& filename) -> bool { |
| std::string fullpath = dir + utils::file::FileUtils::get_separator() + filename; |
| if (acceptFile(fullpath, filename, request)) { |
| putListing(fullpath); |
| } |
| return isRunning(); |
| }; |
| utils::file::FileUtils::list_dir(request.inputDirectory, callback, logger_, request.recursive); |
| } |
| |
| int16_t GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) { |
| metric_vector.push_back(metrics_); |
| return 0; |
| } |
| |
| } // namespace processors |
| } // namespace minifi |
| } // namespace nifi |
| } // namespace apache |
| } // namespace org |