blob: 86f2fbde000a59557d8b25763142733181641dfc [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include <string>
#include "SingleProcessorTestController.h"
#include "TestBase.h"
#include "Catch.h"
#include "../../script/ExecuteScript.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"
namespace org::apache::nifi::minifi::processors::test {
TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "");
plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.py");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Neither script body nor script file is set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "python");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Test both script body and script file set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "python");
plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.py");
plan->setProperty(execute_script, ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
log.info('hello from python')
)");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Python: Test session get should return None if there are no flowfiles in the incoming connections") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
raise Exception("Didn't expect flow_file")
)");
auto result = controller.trigger();
REQUIRE(result.at(ExecuteScript::Success).empty());
REQUIRE(result.at(ExecuteScript::Failure).empty());
}
TEST_CASE("Python: Test Read File", "[executescriptPythonRead]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
import codecs
class ReadCallback(object):
def process(self, input_stream):
content = codecs.getreader('utf-8')(input_stream).read()
log.info('file content: %s' % content)
return len(content)
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
session.read(flow_file, ReadCallback())
session.transfer(flow_file, REL_SUCCESS)
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
}
TEST_CASE("Python: Test Write File", "[executescriptPythonWrite]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
class WriteCallback(object):
def process(self, output_stream):
new_content = 'hello 2'.encode('utf-8')
output_stream.write(new_content)
return len(new_content)
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
session.write(flow_file, WriteCallback())
session.transfer(flow_file, REL_SUCCESS)
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "hello 2");
}
TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
flow_file = session.create()
if flow_file is not None:
log.info('created flow file: %s' % flow_file.getAttribute('filename'))
session.transfer(flow_file, REL_SUCCESS)
)");
auto result = controller.trigger();
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
REQUIRE(result.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
}
TEST_CASE("Python: Test Update Attribute", "[executescriptPythonUpdateAttribute]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
flow_file = session.get()
if flow_file is not None:
log.info('got flow file: %s' % flow_file.getAttribute('filename'))
flow_file.addAttribute('test_attr', '1')
attr = flow_file.getAttribute('test_attr')
log.info('got flow file attr \'test_attr\': %s' % attr)
flow_file.updateAttribute('test_attr', str(int(attr) + 1))
session.transfer(flow_file, REL_SUCCESS)
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
CHECK(result.at(ExecuteScript::Success)[0]->getAttribute("test_attr") == "2");
}
TEST_CASE("Python: Test Get Context Property", "[executescriptPythonGetContextProperty]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
script_engine = context.getProperty('Script Engine')
log.info('got Script Engine property: %s' % script_engine)
)");
auto result_without_input = controller.trigger();
REQUIRE(result_without_input.at(ExecuteScript::Success).empty());
REQUIRE(result_without_input.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("[info] got Script Engine property: python"));
}
TEST_CASE("Python: Test Module Directory property", "[executescriptPythonModuleDirectoryProperty]") {
using org::apache::nifi::minifi::utils::file::get_executable_dir;
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
const auto script_files_directory = minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "test_python_scripts";
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptFile, (script_files_directory / "foo_bar_processor.py").string());
execute_script->setProperty(ExecuteScript::ModuleDirectory, (script_files_directory / "foo_modules" / "foo.py").string() + "," + (script_files_directory / "bar_modules").string());
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
REQUIRE(result.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("foobar"));
}
TEST_CASE("Python: Non existent script file should throw", "[executescriptPythonNonExistentScriptFile]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptFile, "/tmp/non-existent-file");
REQUIRE_THROWS_AS(controller.trigger("tempFile"), minifi::Exception);
}
TEST_CASE("Python can remove flowfiles", "[ExecuteScript]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
flow_file = session.get()
session.remove(flow_file))");
auto result = controller.trigger("hello");
REQUIRE(result.at(ExecuteScript::Success).empty());
REQUIRE(result.at(ExecuteScript::Failure).empty());
}
TEST_CASE("Python can store states in StateManager", "[ExecuteScript]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody,
R"(
def onTrigger(context, session):
state_manager = context.getStateManager()
state = state_manager.get()
if state is None:
state = {}
state['python_trigger_count'] = 0
python_trigger_count = state['python_trigger_count']
log.info('python_trigger_count: ' + str(python_trigger_count))
state['python_trigger_count'] =str(int(python_trigger_count) + 1)
state_manager.set(state)
)");
for (size_t i = 0; i < 4; ++i) {
controller.trigger();
CHECK(LogTestController::getInstance().contains(fmt::format("python_trigger_count: {}", i)));
}
}
} // namespace org::apache::nifi::minifi::processors::test