blob: 9e85d72076eb3fc15f75fef823a2062670b2c27a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define CATCH_CONFIG_MAIN
#include <memory>
#include <string>
#include <set>
#include "../TestBase.h"
#include <ExecuteScript.h>
#include "processors/LogAttribute.h"
#include "processors/GetFile.h"
#include "processors/PutFile.h"
TEST_CASE("Python: Test Read File", "[executescriptPythonRead]") { // NOLINT
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<minifi::processors::LogAttribute>();
logTestController.setDebug<minifi::processors::ExecuteScript>();
auto plan = testController.createPlan();
auto getFile = plan->addProcessor("GetFile", "getFile");
auto logAttribute = plan->addProcessor("LogAttribute", "logAttribute",
core::Relationship("success", "description"),
true);
auto executeScript = plan->addProcessor("ExecuteScript",
"executeScript",
core::Relationship("success", "description"),
true);
auto putFile = plan->addProcessor("PutFile", "putFile", core::Relationship("success", "description"), true);
plan->setProperty(executeScript, processors::ExecuteScript::ScriptBody.getName(), R"(
import codecs
class ReadCallback(object):
def process(self, input_stream):
content = codecs.getreader('utf-8')(input_stream).read()
log.info('file content: %s' % content)
return len(content)
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
session.read(flow_file, ReadCallback())
session.transfer(flow_file, REL_SUCCESS)
)");
char getFileDirFmt[] = "/tmp/ft.XXXXXX";
auto getFileDir = testController.createTempDirectory(getFileDirFmt);
plan->setProperty(getFile, processors::GetFile::Directory.getName(), getFileDir);
char putFileDirFmt[] = "/tmp/ft.XXXXXX";
auto putFileDir = testController.createTempDirectory(putFileDirFmt);
plan->setProperty(putFile, processors::PutFile::Directory.getName(), putFileDir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.empty());
std::fstream file;
std::stringstream ss;
ss << getFileDir << "/" << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
unlink(ss.str().c_str());
REQUIRE(logTestController.contains("[info] file content: tempFile"));
// Verify that file content was preserved
REQUIRE(!std::ifstream(ss.str()).good());
std::stringstream movedFile;
movedFile << putFileDir << "/" << "tstFile.ext";
REQUIRE(std::ifstream(movedFile.str()).good());
file.open(movedFile.str(), std::ios::in);
std::string contents((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
REQUIRE("tempFile" == contents);
file.close();
logTestController.reset();
}
TEST_CASE("Python: Test Write File", "[executescriptPythonWrite]") { // NOLINT
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<minifi::processors::LogAttribute>();
logTestController.setDebug<minifi::processors::ExecuteScript>();
auto plan = testController.createPlan();
auto getFile = plan->addProcessor("GetFile", "getFile");
auto logAttribute = plan->addProcessor("LogAttribute", "logAttribute",
core::Relationship("success", "description"),
true);
auto executeScript = plan->addProcessor("ExecuteScript",
"executeScript",
core::Relationship("success", "description"),
true);
auto putFile = plan->addProcessor("PutFile", "putFile", core::Relationship("success", "description"), true);
plan->setProperty(executeScript, processors::ExecuteScript::ScriptBody.getName(), R"(
class WriteCallback(object):
def process(self, output_stream):
new_content = 'hello 2'.encode('utf-8')
output_stream.write(new_content)
return len(new_content)
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
session.write(flow_file, WriteCallback())
session.transfer(flow_file, REL_SUCCESS)
)");
char getFileDirFmt[] = "/tmp/ft.XXXXXX";
auto getFileDir = testController.createTempDirectory(getFileDirFmt);
plan->setProperty(getFile, processors::GetFile::Directory.getName(), getFileDir);
char putFileDirFmt[] = "/tmp/ft.XXXXXX";
auto putFileDir = testController.createTempDirectory(putFileDirFmt);
plan->setProperty(putFile, processors::PutFile::Directory.getName(), putFileDir);
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
REQUIRE(record == nullptr);
REQUIRE(records.empty());
std::fstream file;
std::stringstream ss;
ss << getFileDir << "/" << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
testController.runSession(plan, false);
records = plan->getProvenanceRecords();
record = plan->getCurrentFlowFile();
testController.runSession(plan, false);
unlink(ss.str().c_str());
// Verify new content was written
REQUIRE(!std::ifstream(ss.str()).good());
std::stringstream movedFile;
movedFile << putFileDir << "/" << "tstFile.ext";
REQUIRE(std::ifstream(movedFile.str()).good());
file.open(movedFile.str(), std::ios::in);
std::string contents((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
REQUIRE("hello 2" == contents);
file.close();
logTestController.reset();
}
TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") { // NOLINT
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<minifi::processors::ExecuteScript>();
auto plan = testController.createPlan();
auto executeScript = plan->addProcessor("ExecuteScript",
"executeScript");
plan->setProperty(executeScript, processors::ExecuteScript::ScriptBody.getName(), R"(
def onTrigger(context, session):
flow_file = session.create()
if flow_file is not None:
log.info('created flow file: %s' % flow_file.getAttribute('filename'))
session.transfer(flow_file, REL_SUCCESS)
)");
plan->reset();
testController.runSession(plan, false);
REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
logTestController.reset();
}
TEST_CASE("Python: Test Update Attribute", "[executescriptPythonUpdateAttribute]") { // NOLINT
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<minifi::processors::LogAttribute>();
logTestController.setDebug<minifi::processors::ExecuteScript>();
auto plan = testController.createPlan();
auto getFile = plan->addProcessor("GetFile", "getFile");
auto executeScript = plan->addProcessor("ExecuteScript",
"executeScript",
core::Relationship("success", "description"),
true);
auto logAttribute = plan->addProcessor("LogAttribute", "logAttribute",
core::Relationship("success", "description"),
true);
plan->setProperty(executeScript, processors::ExecuteScript::ScriptBody.getName(), R"(
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
flow_file.addAttribute('test_attr', '1')
attr = flow_file.getAttribute('test_attr')
log.info('got flow file attr \'test_attr\': %s' % attr)
flow_file.updateAttribute('test_attr', str(int(attr) + 1))
session.transfer(flow_file, REL_SUCCESS)
)");
char getFileDirFmt[] = "/tmp/ft.XXXXXX";
auto getFileDir = testController.createTempDirectory(getFileDirFmt);
plan->setProperty(getFile, processors::GetFile::Directory.getName(), getFileDir);
std::fstream file;
std::stringstream ss;
ss << getFileDir << "/" << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
testController.runSession(plan, false);
REQUIRE(LogTestController::getInstance().contains("key:test_attr value:2"));
logTestController.reset();
}
TEST_CASE("Python: Test Get Context Property", "[executescriptPythonGetContextProperty]") { // NOLINT
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<minifi::processors::LogAttribute>();
logTestController.setDebug<minifi::processors::ExecuteScript>();
auto plan = testController.createPlan();
auto getFile = plan->addProcessor("GetFile", "getFile");
auto executeScript = plan->addProcessor("ExecuteScript",
"executeScript",
core::Relationship("success", "description"),
true);
auto logAttribute = plan->addProcessor("LogAttribute", "logAttribute",
core::Relationship("success", "description"),
true);
plan->setProperty(executeScript, processors::ExecuteScript::ScriptBody.getName(), R"(
def onTrigger(context, session):
script_engine = context.getProperty('Script Engine')
log.info('got Script Engine property: %s' % script_engine)
)");
char getFileDirFmt[] = "/tmp/ft.XXXXXX";
auto getFileDir = testController.createTempDirectory(getFileDirFmt);
plan->setProperty(getFile, processors::GetFile::Directory.getName(), getFileDir);
std::fstream file;
std::stringstream ss;
ss << getFileDir << "/" << "tstFile.ext";
file.open(ss.str(), std::ios::out);
file << "tempFile";
file.close();
plan->reset();
testController.runSession(plan, false);
testController.runSession(plan, false);
testController.runSession(plan, false);
REQUIRE(LogTestController::getInstance().contains("[info] got Script Engine property: python"));
logTestController.reset();
}