blob: 15b47019fd81587f5ead6a97e9ea64f1cab3c947 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <vector>
#include <utility>
#include <algorithm>
#include <string>
#include <memory>
#include <filesystem>
#include "core/Core.h"
#include "core/logging/LoggerConfiguration.h"
#include "core/Resource.h"
#include "ExecutePythonProcessor.h"
#include "PythonConfigState.h"
#include "PythonObjectFactory.h"
#include "agent/agent_version.h"
#include "agent/build_description.h"
#include "utils/file/FileUtils.h"
#include "utils/StringUtils.h"
#include "range/v3/algorithm.hpp"
#include "properties/Configuration.h"
#include "utils/file/FilePattern.h"
#include "range/v3/view/filter.hpp"
#include "PythonDependencyInstaller.h"
#include "utils/file/PathUtils.h"
namespace org::apache::nifi::minifi::extensions::python {
/**
* Can be used to load the python processors from NiFi properties.
*/
class PythonCreator : public minifi::core::CoreComponent {
public:
explicit PythonCreator(std::string name, const utils::Identifier &uuid = {})
: minifi::core::CoreComponent(std::move(name), uuid) {
}
~PythonCreator() override {
for (const auto& clazz : registered_classes_) {
core::getClassLoader().unregisterClass(clazz);
}
}
void configure(const std::shared_ptr<Configure> &configuration) override {
PythonDependencyInstaller dependency_installer(configuration);
dependency_installer.installDependenciesFromRequirementsFiles();
auto engine = std::make_shared<python::PythonScriptEngine>();
std::optional<std::string> pathListings = configuration ? configuration->get(minifi::Configuration::nifi_python_processor_dir) : std::nullopt;
if (!pathListings) {
return;
}
configure({pathListings.value()});
auto python_lib_path = getPythonLibPath(configuration);
for (const auto &path : classpaths_) {
const auto script_name = path.stem();
const auto package = getPackage(pathListings.value(), path.string());
std::string class_name = script_name.string();
std::string full_name = "org.apache.nifi.minifi.processors." + script_name.string();
if (!package.empty()) {
full_name = utils::string::join_pack("org.apache.nifi.minifi.processors.", package, ".", script_name.string());
class_name = full_name;
}
if (path.string().find("nifi_python_processors") != std::string::npos) {
auto utils_path = (std::filesystem::path("nifi_python_processors") / "utils").string();
if (path.string().find(utils_path) != std::string::npos) {
continue;
}
dependency_installer.installInlinePythonDependencies(path);
logger_->log_info("Registering NiFi python processor: {}", class_name);
core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(),
PythonProcessorType::NIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}, path.parent_path()}));
} else {
logger_->log_info("Registering MiNiFi python processor: {}", class_name);
core::getClassLoader().registerClass(class_name, std::make_unique<PythonObjectFactory>(path.string(), script_name.string(),
PythonProcessorType::MINIFI_TYPE, std::vector<std::filesystem::path>{python_lib_path, std::filesystem::path{pathListings.value()}}));
}
registered_classes_.push_back(class_name);
try {
registerScriptDescription(class_name, full_name, path, script_name.string());
} catch (const std::exception &err) {
logger_->log_error("Cannot load {}: {}", script_name, err.what());
}
}
}
private:
void registerScriptDescription(const std::string& class_name, const std::string& full_name, const std::filesystem::path& path, const std::string& script_name) {
auto processor = core::ClassLoader::getDefaultClassLoader().instantiate<python::processors::ExecutePythonProcessor>(class_name, utils::IdGenerator::getIdGenerator()->generate());
if (!processor) {
logger_->log_error("Couldn't instantiate '{}' python processor", class_name);
return;
}
processor->initialize();
minifi::BundleDetails details;
details.artifact = path.filename().string();
details.version = minifi::AgentBuild::VERSION;
details.group = "python";
minifi::ClassDescription description{
.type_ = ResourceType::Processor,
.short_name_ = script_name,
.full_name_ = full_name,
.description_ = processor->getDescription(),
.class_properties_ = processor->getPythonProperties(),
.class_relationships_ = processor->getPythonRelationships(),
.supports_dynamic_properties_ = processor->getPythonSupportDynamicProperties(),
.inputRequirement_ = toString(processor->getInputRequirement()),
.isSingleThreaded_ = processor->isSingleThreaded()};
minifi::ExternalBuildDescription::addExternalComponent(details, description);
}
void configure(const std::vector<std::string> &pythonFiles) {
std::vector<std::string> pathOrFiles;
for (const auto &path : pythonFiles) {
const auto vec = utils::string::split(path, ",");
pathOrFiles.insert(pathOrFiles.end(), vec.begin(), vec.end());
}
for (const auto &path : pathOrFiles) {
utils::file::addFilesMatchingExtension(logger_, path, ".py", classpaths_);
}
classpaths_ = classpaths_
| ranges::views::filter([] (auto& path) { return path.string().find("nifiapi") == std::string::npos && path.string().find("__init__") == std::string::npos; })
| ranges::to<std::vector<std::filesystem::path>>();
}
std::string getPackage(const std::string &basePath, const std::string &pythonscript) {
if (!minifi::utils::string::startsWith(pythonscript, basePath)) {
return "";
}
const auto python_package_path = std::filesystem::relative(pythonscript, basePath).parent_path();
std::vector<std::string> path_elements;
path_elements.reserve(std::distance(python_package_path.begin(), python_package_path.end()));
std::transform(python_package_path.begin(), python_package_path.end(), std::back_inserter(path_elements), [](const auto& path) { return path.string(); });
std::string python_package = minifi::utils::string::join(".", path_elements);
if (python_package.length() > 1 && python_package.at(0) == '.') {
python_package = python_package.substr(1);
}
ranges::transform(python_package, python_package.begin(), ::tolower);
return python_package;
}
std::filesystem::path getPythonLibPath(const std::shared_ptr<Configure>& configuration) {
constexpr const char* DEFAULT_EXTENSION_PATH = "../extensions/*";
std::string pattern = [&] {
auto opt_pattern = configuration->get(minifi::Configuration::nifi_extension_path);
if (!opt_pattern) {
logger_->log_warn("No extension path is provided, using default: '{}'", DEFAULT_EXTENSION_PATH);
}
return opt_pattern.value_or(DEFAULT_EXTENSION_PATH);
}();
auto candidates = utils::file::match(utils::file::FilePattern(pattern, [&] (std::string_view subpattern, std::string_view error_msg) {
logger_->log_error("Error in subpattern '{}': {}", subpattern, error_msg);
}));
std::filesystem::path python_lib_path;
for (const auto& candidate : candidates) {
if (candidate.string().find("python") != std::string::npos) {
python_lib_path = candidate.parent_path();
break;
}
}
return python_lib_path;
}
std::vector<std::string> registered_classes_;
std::vector<std::filesystem::path> classpaths_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PythonCreator>::getLogger();
};
} // namespace org::apache::nifi::minifi::extensions::python