blob: b715fde1239da45251a68841c7e2ba82533914e6 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <fstream>
#include <map>
#include <memory>
#include <utility>
#include <string>
#include <iostream>
#include <set>
#include <algorithm>
#include <random>
#include <cstdlib>
#include "FlowController.h"
#include "TestBase.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "utils/file/FileUtils.h"
#include "unit/ProvenanceTestHelper.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "TailFile.h"
#include "LogAttribute.h"
static std::string NEWLINE_FILE = "" // NOLINT
"one,two,three\n"
"four,five,six, seven";
static const char *TMP_FILE = "minifi-tmpfile.txt";
static const char *STATE_FILE = "minifi-state-file.txt";
TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
REQUIRE(records.size() == 2);
testController.runSession(plan, false);
REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0"));
LogTestController::getInstance().reset();
// Delete the test and state file.
remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
plan->reset(true); // start a new but with state file
std::ofstream appendStream;
appendStream.open(temp_file.str(), std::ios_base::app | std::ios_base::binary);
appendStream << std::endl;
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("position 14"));
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
LogTestController::getInstance().reset();
// Delete the test and state file.
remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
plan->reset(true); // start a new but with state file
remove(std::string(state_file.str() + "." + id).c_str());
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("position 0"));
// if we lose state we restart
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
// Delete the test and state file.
}
TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::ofstream appendStream;
appendStream.open(temp_file.str(), std::ios_base::app | std::ios_base::binary);
appendStream.write("\n", 1);
appendStream.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
// should stay the same
for (int i = 0; i < 5; i++) {
plan->reset(true); // start a new but with state file
auto statefile = state_file.str() + "." + id;
remove(statefile.c_str());
std::ofstream newstatefile;
newstatefile.open(statefile);
newstatefile << "FILENAME=" << temp_file.str() << std::endl;
newstatefile << "POSITION=14" << std::endl;
newstatefile.close();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("position 14"));
// if we lose state we restart
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
}
for (int i = 14; i < 34; i++) {
plan->reset(true); // start a new but with state file
auto statefile = state_file.str() + "." + id;
remove(statefile.c_str());
std::ofstream newstatefile;
newstatefile.open(statefile);
newstatefile << "FILENAME=" << temp_file.str() << std::endl;
newstatefile << "POSITION=" << i << std::endl;
newstatefile.close();
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("position " + std::to_string(i)));
}
plan->runCurrentProcessor();
for (int i = 14; i < 34; i++) {
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
}
// Delete the test and state file.
remove(std::string(state_file.str() + "." + id).c_str());
}
TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str());
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::ofstream appendStream;
appendStream.open(temp_file.str(), std::ios_base::app);
appendStream.write("\n", 1);
appendStream.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
#ifdef WIN32
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-14.txt"));
#else
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
#endif
plan->reset(true); // start a new but with state file
auto statefile = state_file.str() + "." + id;
remove(statefile.c_str());
SECTION("No Filename") {
std::ofstream newstatefile;
newstatefile.open(statefile);
newstatefile << "POSITION=14" << std::endl;
newstatefile.close();
REQUIRE_THROWS(testController.runSession(plan, true));
}
SECTION("Invalid current filename") {
std::ofstream newstatefile;
newstatefile.open(statefile);
newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
newstatefile << "CURRENT.minifi-tempfile.txt=minifi-tmpfile.txt" << std::endl;
newstatefile << "POSITION=14" << std::endl;
newstatefile.close();
REQUIRE_THROWS(testController.runSession(plan, true));
}
SECTION("No current filename and partial path") {
std::ofstream newstatefile;
newstatefile.open(statefile);
newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
newstatefile << "POSITION=14" << std::endl;
newstatefile.close();
REQUIRE_THROWS(testController.runSession(plan, true));
}
// Delete the test and state file.
remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << NEWLINE_FILE;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
REQUIRE(records.size() == 2);
testController.runSession(plan, false);
REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.size()) + " Offset:0"));
LogTestController::getInstance().reset();
}
TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
std::string line1("foo");
std::string line2(8050, 0);
std::mt19937 gen(std::random_device { }());
std::generate_n(line2.begin(), line2.size(), [&]() -> char {
return 32 + gen() % (127 - 32);
});
std::string line3("bar");
std::string line4("buzz");
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::TailFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::ProcessSession>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
uint32_t line_length = 0U;
SECTION("with line length 80") {
line_length = 80U;
}
SECTION("with line length 200") {
line_length = 200U;
plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "200");
}
SECTION("with line length 0") {
line_length = 0U;
plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "0");
}
SECTION("with line length 16") {
line_length = 16U;
plan->setProperty(log_attr, processors::LogAttribute::MaxPayloadLineLength.getName(), "16");
}
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 3 flow files"));
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line1)));
auto line2_hex = utils::StringUtils::to_hex(line2);
if (line_length == 0U) {
REQUIRE(LogTestController::getInstance().contains(line2_hex));
} else {
std::stringstream line2_hex_lines;
for (size_t i = 0; i < line2_hex.size(); i += line_length) {
line2_hex_lines << line2_hex.substr(i, line_length) << '\n';
}
REQUIRE(LogTestController::getInstance().contains(line2_hex_lines.str()));
}
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0)));
LogTestController::getInstance().reset();
}
TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
// Test having two delimiters on the buffer boundary
std::string line1(4097, '\n');
std::mt19937 gen(std::random_device { }());
std::generate_n(line1.begin(), 4095, [&]() -> char {
return 32 + gen() % (127 - 32);
});
std::string line2("foo");
std::string line3("bar");
std::string line4("buzz");
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::TailFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
LogTestController::getInstance().setTrace<core::ProcessSession>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::stringstream temp_file;
temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
std::ofstream tmpfile;
tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
tmpfile.close();
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
plan->setProperty(log_attr, processors::LogAttribute::HexencodePayload.getName(), "true");
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 5 flow files"));
auto line1_hex = utils::StringUtils::to_hex(line1.substr(0, 4095));
std::stringstream line1_hex_lines;
for (size_t i = 0; i < line1_hex.size(); i += 80) {
line1_hex_lines << line1_hex.substr(i, 80) << '\n';
}
REQUIRE(LogTestController::getInstance().contains(line1_hex_lines.str()));
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line2)));
REQUIRE(LogTestController::getInstance().contains(utils::StringUtils::to_hex(line3)));
REQUIRE(false == LogTestController::getInstance().contains(utils::StringUtils::to_hex(line4), std::chrono::seconds(0)));
LogTestController::getInstance().reset();
}
TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
SECTION("No File and No base") {
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
}
SECTION("No base") {
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
}
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
REQUIRE_THROWS(plan->runNextProcessor());
}
TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
TestController testController;
const char DELIM = ',';
size_t expected_pieces = std::count(NEWLINE_FILE.begin(), NEWLINE_FILE.end(), DELIM); // The last piece is left as considered unfinished
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::TailFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
auto plan = testController.createPlan();
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
// Define test input file
std::string in_file(dir);
#ifndef WIN32
in_file.append("/");
#else
in_file.append("\\");
#endif
in_file.append("testfifo.txt");
std::string state_file(dir);
state_file.append("tailfile.state");
std::ofstream in_file_stream(in_file);
in_file_stream << NEWLINE_FILE;
in_file_stream.flush();
// Build MiNiFi processing graph
auto tail_file = plan->addProcessor("TailFile", "Tail");
plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
SECTION("single") {
plan->setProperty(
tail_file,
processors::TailFile::FileName.getName(), in_file);
}
SECTION("Multiple") {
plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
}
plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
// Log as many FFs as it can to make sure exactly the expected amount is produced
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
std::cout << " find " << expected_pieces << std::endl;
REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(expected_pieces) + " flow files"));
in_file_stream << DELIM;
in_file_stream.close();
std::string rotated_file = (in_file + ".1");
REQUIRE(rename(in_file.c_str(), rotated_file.c_str()) == 0);
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time
std::ofstream new_in_file_stream(in_file);
new_in_file_stream << "five" << DELIM << "six" << DELIM;
new_in_file_stream.close();
plan->reset();
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
// Find the last flow file in the rotated file
REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
plan->reset();
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
// Two new files in the new flow file
REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
}
TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
TestController testController;
const char DELIM = ':';
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setTrace<processors::TailFile>();
LogTestController::getInstance().setTrace<processors::LogAttribute>();
auto plan = testController.createPlan();
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
std::string state_file(dir);
state_file.append("tailfile.state");
// Define test input file
std::string in_file(dir);
in_file.append("fruits.txt");
for (int i = 2; 0 <= i; --i) {
if (i < 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure the new file gets newer modification time
}
std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
for (int j = 0; j <= i; j++) {
in_file_stream << "Apple" << DELIM;
}
in_file_stream.close();
}
// Build MiNiFi processing graph
auto tail_file = plan->addProcessor("TailFile", "Tail");
plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
// Log as many FFs as it can to make sure exactly the expected amount is produced
// Each iteration should go through one file and log all flowfiles
for (int i = 2; 0 <= i; --i) {
plan->reset();
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(i + 1) + " flow files"));
}
// Rrite some more data to the source file
std::ofstream in_file_stream(in_file);
in_file_stream << "Pear" << DELIM << "Cherry" << DELIM;
plan->reset();
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
}