blob: 256ceee0ae0f0f7bfe7eb3cf22f2bf85dce620a2 [file] [log] [blame]
/**
* @file ExecuteScript.h
* ExecuteScript class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <memory>
#include <utility>
#include <optional>
#include "concurrentqueue.h"
#include "core/Processor.h"
#include "ScriptEngine.h"
#include "ScriptProcessContext.h"
#include "utils/Enum.h"
#include "utils/ResourceQueue.h"
#ifdef LUA_SUPPORT
#include "lua/LuaScriptEngine.h"
#endif // LUA_SUPPORT
#ifdef PYTHON_SUPPORT
#include "python/PythonScriptEngine.h"
#endif // PYTHON_SUPPORT
namespace org::apache::nifi::minifi::processors {
class ScriptEngineFactory {
public:
ScriptEngineFactory(const core::Relationship& success, const core::Relationship& failure, std::shared_ptr<core::logging::Logger> logger);
template<typename T>
std::enable_if_t<std::is_base_of_v<script::ScriptEngine, T>, std::unique_ptr<T>> createEngine() const {
auto engine = std::make_unique<T>();
engine->bind("log", logger_);
engine->bind("REL_SUCCESS", success_);
engine->bind("REL_FAILURE", failure_);
return engine;
}
private:
const core::Relationship& success_;
const core::Relationship& failure_;
std::shared_ptr<core::logging::Logger> logger_;
};
class ExecuteScript : public core::Processor {
public:
SMART_ENUM(ScriptEngineOption,
(LUA, "lua"),
(PYTHON, "python")
)
explicit ExecuteScript(std::string name, const utils::Identifier &uuid = {})
: Processor(std::move(name), uuid),
engine_factory_(Success, Failure, logger_) {
}
EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. "
"The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as "
"any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.Scripts must define an onTrigger function which accepts NiFi Context "
"and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each incoming flowfile. This enables scripts to keep state "
"if they wish, although there will be a script context per concurrent task of the processor. In order to, e.g., compute an arithmetic sum based on incoming flow file information, set the "
"concurrent tasks to 1.";
EXTENSIONAPI static const core::Property ScriptEngine;
EXTENSIONAPI static const core::Property ScriptFile;
EXTENSIONAPI static const core::Property ScriptBody;
EXTENSIONAPI static const core::Property ModuleDirectory;
static auto properties() {
return std::array{
ScriptEngine,
ScriptFile,
ScriptBody,
ModuleDirectory
};
}
EXTENSIONAPI static const core::Relationship Success;
EXTENSIONAPI static const core::Relationship Failure;
static auto relationships() { return std::array{Success, Failure}; }
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void initialize() override;
void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
logger_->log_error("onTrigger invocation with raw pointers is not implemented");
}
void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) override;
private:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteScript>::getLogger();
ScriptEngineOption script_engine_;
std::string script_file_;
std::string script_body_;
std::optional<std::string> module_directory_;
ScriptEngineFactory engine_factory_;
#ifdef LUA_SUPPORT
std::shared_ptr<utils::ResourceQueue<lua::LuaScriptEngine>> lua_script_engine_queue_;
#endif // LUA_SUPPORT
#ifdef PYTHON_SUPPORT
std::unique_ptr<python::PythonScriptEngine> python_script_engine_;
#endif // PYTHON_SUPPORT
};
} // namespace org::apache::nifi::minifi::processors