blob: 992a4bb5df5f79287bd98381b9a1ae9dfe938a2e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include <memory>
#include <string>
#include "SingleProcessorTestController.h"
#include "TestBase.h"
#include "Catch.h"
#include "../../script/ExecuteScript.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"
namespace org::apache::nifi::minifi::processors::test {
TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "");
plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.lua");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Neither script body nor script file is set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "lua");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Test both script body and script file set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "lua");
plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.lua");
plan->setProperty(execute_script, ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
log:info('hello from lua')
end
)");
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE("Lua: Test session get should return None if there are no flowfiles in the incoming connections") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
flow_file = session:get()
if flow_file ~= nil then
error("Didn't expect flow_file")
end
end
)");
auto result = controller.trigger();
REQUIRE(result.at(ExecuteScript::Success).empty());
REQUIRE(result.at(ExecuteScript::Failure).empty());
}
TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") {
LogTestController::getInstance().reset();
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
log:info('hello from lua')
end
)");
auto result = controller.trigger();
REQUIRE(result.at(ExecuteScript::Success).empty());
REQUIRE(result.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("[org::apache::nifi::minifi::processors::ExecuteScript] [info] hello from lua"));
}
TEST_CASE("Lua: Test Read File", "[executescriptLuaRead]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
read_callback = {}
function read_callback.process(self, input_stream)
content = input_stream:read(0)
log:info('file content: ' .. content)
return #content
end
function onTrigger(context, session)
flow_file = session:get()
if flow_file ~= nil then
log:info('got flow file: ' .. flow_file:getAttribute('filename'))
session:read(flow_file, read_callback)
session:transfer(flow_file, REL_SUCCESS)
end
end
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
}
TEST_CASE("Lua: Test Write File", "[executescriptLuaWrite]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
write_callback = {}
function write_callback.process(self, output_stream)
new_content = 'hello 2'
output_stream:write(new_content)
return #new_content
end
function onTrigger(context, session)
flow_file = session:get()
if flow_file ~= nil then
log:info('got flow file: ' .. flow_file:getAttribute('filename'))
session:write(flow_file, write_callback)
session:transfer(flow_file, REL_SUCCESS)
end
end
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "hello 2");
}
TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") {
LogTestController::getInstance().reset();
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
flow_file = session:create(nil)
if flow_file ~= nil then
log:info('created flow file: ' .. flow_file:getAttribute('filename'))
session:transfer(flow_file, REL_SUCCESS)
end
end
)");
auto result = controller.trigger();
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
REQUIRE(result.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
}
TEST_CASE("Lua: Test Update Attribute", "[executescriptLuaUpdateAttribute]") {
LogTestController::getInstance().reset();
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
flow_file = session:get()
if flow_file ~= nil then
log:info('got flow file: ' .. flow_file:getAttribute('filename'))
flow_file:addAttribute('test_attr', '1')
attr = tonumber(flow_file:getAttribute('test_attr'))
log:info('got flow file attr \'test_attr\': ' .. tostring(attr))
flow_file:updateAttribute('test_attr', tostring(attr + 1))
session:transfer(flow_file, REL_SUCCESS)
end
end
)");
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
CHECK(result.at(ExecuteScript::Success)[0]->getAttribute("test_attr") == "2");
}
TEST_CASE("Lua: Test Require", "[executescriptLuaRequire]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<ExecuteScript>();
auto plan = testController.createPlan();
auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
plan->setProperty(executeScript, ExecuteScript::ScriptEngine, "lua");
plan->setProperty(executeScript, ExecuteScript::ScriptBody, R"(
require 'os'
require 'coroutine'
require 'math'
require 'io'
require 'string'
require 'table'
require 'package'
log:info('OK')
function onTrigger(context, session)
end
)");
REQUIRE_NOTHROW(testController.runSession(plan, false));
REQUIRE(LogTestController::getInstance().contains("[info] OK"));
logTestController.reset();
}
TEST_CASE("Lua: Test Module Directory property", "[executescriptLuaModuleDirectoryProperty]") {
using org::apache::nifi::minifi::utils::file::get_executable_dir;
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
const auto script_files_directory = minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "test_lua_scripts";
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptFile, (script_files_directory / "foo_bar_processor.lua").string());
execute_script->setProperty(ExecuteScript::ModuleDirectory, (script_files_directory / "foo_modules" / "foo.lua").string() + "," + (script_files_directory / "bar_modules").string());
auto result = controller.trigger("tempFile");
REQUIRE(result.at(ExecuteScript::Success).size() == 1);
REQUIRE(result.at(ExecuteScript::Failure).empty());
REQUIRE(LogTestController::getInstance().contains("foobar"));
}
TEST_CASE("Lua: Non existent script file should throw", "[executescriptLuaNonExistentScriptFile]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptFile, "/tmp/non-existent-file");
REQUIRE_THROWS_AS(controller.trigger("tempFile"), minifi::Exception);
}
TEST_CASE("Lua can remove flowfiles", "[ExecuteScript]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody,
R"(
function onTrigger(context, session)
flow_file = session:get()
session:remove(flow_file)
end
)");
auto result = controller.trigger("hello");
REQUIRE(result.at(ExecuteScript::Success).empty());
REQUIRE(result.at(ExecuteScript::Failure).empty());
}
TEST_CASE("Lua can store states in StateManager", "[ExecuteScript]") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};
LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>();
execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody,
R"(
function onTrigger(context, session)
state_manager = context:getStateManager()
state = state_manager:get()
if state == nil then
state = {}
state['lua_trigger_count'] = 0
end
lua_trigger_count = state['lua_trigger_count']
log:info('lua_trigger_count: ' .. lua_trigger_count)
state['lua_trigger_count'] = tostring(tonumber(lua_trigger_count) + 1)
state_manager:set(state)
end
)");
for (size_t i = 0; i < 4; ++i) {
controller.trigger();
CHECK(LogTestController::getInstance().contains(fmt::format("lua_trigger_count: {}", i)));
}
}
} // namespace org::apache::nifi::minifi::processors::test