blob: 2d422e2bfb1ebb25433066de50fa12c72c60e8ac [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/stat.h>
#include <cstdio>
#include <utility>
#include <memory>
#include <string>
#include <vector>
#include <set>
#include <fstream>
#include "utils/file/FileUtils.h"
#include "TestBase.h"
#include "processors/LogAttribute.h"
#include "processors/GetFile.h"
#include "processors/PutFile.h"
#include "unit/ProvenanceTestHelper.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
TEST_CASE("Test Creation of PutFile", "[getfileCreate]") {
TestController testController;
std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::PutFile>("processorname");
REQUIRE(processor->getName() == "processorname");
}
TEST_CASE("PutFileTest", "[getfileputpfile]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile::ReadCallback>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
std::remove(ss.str().c_str());
REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
// verify that the fle was moved
REQUIRE(false == std::ifstream(ss.str()).good());
std::stringstream movedFile;
movedFile << putfiledir << utils::file::FileUtils::get_separator() << "tstFile.ext";
REQUIRE(true == std::ifstream(movedFile.str()).good());
file.open(movedFile.str(), std::ios::in);
std::string contents((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
REQUIRE("tempFile" == contents);
file.close();
LogTestController::getInstance().reset();
}
TEST_CASE("PutFileTestFileExists", "[getfileputpfile]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("failure", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
//
std::stringstream movedFile;
movedFile << putfiledir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(movedFile.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
std::remove(ss.str().c_str());
REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
// verify that the fle was moved
REQUIRE(false == std::ifstream(ss.str()).good());
REQUIRE(true == std::ifstream(movedFile.str()).good());
LogTestController::getInstance().reset();
}
TEST_CASE("PutFileTestFileExistsIgnore", "[getfileputpfile]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), "ignore");
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
//
std::stringstream movedFile;
movedFile << putfiledir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(movedFile.str(), std::ios::out);
file << "tempFile";
file.close();
auto filemodtime = fileutils::FileUtils::last_write_time(movedFile.str());
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
std::remove(ss.str().c_str());
REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + dir));
// verify that the fle was moved
REQUIRE(false == std::ifstream(ss.str()).good());
REQUIRE(true == std::ifstream(movedFile.str()).good());
REQUIRE(filemodtime == fileutils::FileUtils::last_write_time(movedFile.str()));
LogTestController::getInstance().reset();
}
TEST_CASE("PutFileTestFileExistsReplace", "[getfileputpfile]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
plan->addProcessor("LogAttribute", "logattribute", { core::Relationship("success", "d"), core::Relationship("failure", "d") }, true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::ConflictResolution.getName(), "replace");
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.size() == 0);
std::fstream file;
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
//
std::stringstream movedFile;
movedFile << putfiledir << utils::file::FileUtils::get_separator() << "tstFile.ext";
file.open(movedFile.str(), std::ios::out);
file << "tempFile";
file.close();
auto filemodtime = fileutils::FileUtils::last_write_time(movedFile.str());
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
std::remove(ss.str().c_str());
REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str()));
REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
// verify that the fle was moved
REQUIRE(false == std::ifstream(ss.str()).good());
REQUIRE(true == std::ifstream(movedFile.str()).good());
#ifndef WIN32
REQUIRE(filemodtime != fileutils::FileUtils::last_write_time(movedFile.str()));
#endif
LogTestController::getInstance().reset();
}
TEST_CASE("Test generation of temporary write path", "[putfileTmpWritePath]") {
auto processor = std::make_shared<org::apache::nifi::minifi::processors::PutFile>("processorname");
std::stringstream prefix;
prefix << "a" << utils::file::FileUtils::get_separator() << "b" << utils::file::FileUtils::get_separator();
std::string path = prefix.str() + "c";
std::string expected_path = prefix.str() + ".c";
REQUIRE(processor->tmpWritePath(path, "").substr(1, expected_path.length()) == expected_path);
}
TEST_CASE("PutFileMaxFileCountTest", "[getfileputpfilemaxcount]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
plan->addProcessor("LogAttribute", "logattribute", { core::Relationship("success", "d"), core::Relationship("failure", "d") }, true);
char format[] = "/tmp/gt.XXXXXX";
const auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::BatchSize.getName(), "1");
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::MaxDestFiles.getName(), "1");
for (int i = 0; i < 2; ++i) {
std::stringstream ss;
ss << dir << utils::file::FileUtils::get_separator() << "tstFile" << i << ".ext";
std::fstream file;
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
}
plan->reset();
testController.runSession(plan);
plan->reset();
testController.runSession(plan);
REQUIRE(LogTestController::getInstance().contains("key:absolute.path value:" + std::string(dir) + utils::file::FileUtils::get_separator() + "tstFile0.ext"));
REQUIRE(LogTestController::getInstance().contains("Size:8 Offset:0"));
REQUIRE(LogTestController::getInstance().contains("key:path value:" + std::string(dir)));
// Only 1 of the 2 files should make it to the target dir
// Non-determistic, so let's just count them
int files_in_dir = 0;
for (int i = 0; i < 2; ++i) {
std::stringstream ss;
ss << putfiledir << utils::file::FileUtils::get_separator() << "tstFile" << i << ".ext";
std::ifstream file(ss.str());
if (file.is_open() && file.good()) {
files_in_dir++;
file.close();
}
}
REQUIRE(files_in_dir == 1);
REQUIRE(LogTestController::getInstance().contains("which exceeds the configured max number of files"));
LogTestController::getInstance().reset();
}
TEST_CASE("PutFileEmptyTest", "[EmptyFilePutTest]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2);
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
std::ofstream of(std::string(dir) + utils::file::FileUtils::get_separator() + "tstFile.ext");
of.close();
plan->runNextProcessor(); // Get
plan->runNextProcessor(); // Put
std::ifstream is(std::string(putfiledir) + utils::file::FileUtils::get_separator() + "tstFile.ext", std::ifstream::binary);
REQUIRE(is.is_open());
is.seekg(0, is.end);
REQUIRE(is.tellg() == 0);
}
#ifndef WIN32
TEST_CASE("TestPutFilePermissions", "[PutFilePermissions]") {
TestController testController;
LogTestController::getInstance().setDebug<minifi::processors::GetFile>();
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::PutFile>();
std::shared_ptr<TestPlan> plan = testController.createPlan();
std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2");
std::shared_ptr<core::Processor> putfile = plan->addProcessor("PutFile", "putfile", core::Relationship("success", "description"), true);
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
char format2[] = "/tmp/ft.XXXXXX";
auto putfiledir = testController.createTempDirectory(format2) + utils::file::FileUtils::get_separator() + "test_dir";
plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Directory.getName(), putfiledir);
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::Permissions.getName(), "644");
plan->setProperty(putfile, org::apache::nifi::minifi::processors::PutFile::DirectoryPermissions.getName(), "0777");
std::fstream file;
file.open(std::string(dir) + utils::file::FileUtils::get_separator() + "tstFile.ext", std::ios::out);
file << "tempFile";
file.close();
plan->runNextProcessor(); // Get
plan->runNextProcessor(); // Put
auto path = std::string(putfiledir) + utils::file::FileUtils::get_separator() + "tstFile.ext";
uint32_t perms = 0;
REQUIRE(utils::file::FileUtils::get_permissions(path, perms));
REQUIRE(perms == 0644);
REQUIRE(utils::file::FileUtils::get_permissions(putfiledir, perms));
REQUIRE(perms == 0777);
}
#endif