| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define CUSTOM_EXTENSION_INIT |
| |
| #include <fstream> |
| |
| #include "minifi-cpp/agent/agent_version.h" |
| #include "unit/TestBase.h" |
| #include "unit/Catch.h" |
| #include "unit/TestControllerWithFlow.h" |
| #include "unit/EmptyFlow.h" |
| #include "unit/TestUtils.h" |
| #include "c2/C2MetricsPublisher.h" |
| #include "minifi-cpp/utils/gsl.h" |
| |
| using minifi::state::response::SerializedResponseNode; |
| using minifi::state::response::ValueNode; |
| |
| template<typename F> |
| const SerializedResponseNode* findNode(const std::vector<SerializedResponseNode>& nodes, F filter) { |
| for (auto& node : nodes) { |
| if (filter(node)) { |
| return &node; |
| } |
| } |
| return nullptr; |
| } |
| |
| const SerializedResponseNode& getNode(const std::vector<SerializedResponseNode>& nodes, std::string_view name) { |
| for (auto& node : nodes) { |
| if (node.name == name) return node; |
| } |
| FAIL(fmt::format("Node {} was not found", name)); |
| gsl_FailFast(); |
| } |
| |
| ValueNode getNthAllowableValue(const SerializedResponseNode& node, size_t n) { |
| return getNode(node.children[n].children, "value").value; |
| } |
| |
| TEST_CASE("Python processor's description is part of the manifest") { |
| TestControllerWithFlow controller(empty_flow, false /* DEFER FLOW SETUP */); |
| |
| auto python_dir = controller.createTempDirectory() / "minifi-python"; |
| utils::file::create_dir(python_dir); |
| std::ofstream{python_dir / "MyPyProc.py"} << |
| "def describe(proc):\n" |
| " proc.setDescription('An amazing processor')\n"; |
| |
| std::ofstream{python_dir / "MyPyProc2.py"} << |
| "def describe(proc):\n" |
| " proc.setDescription('Another amazing processor')\n" |
| " proc.setSupportsDynamicProperties()\n" |
| " proc.addProperty('Prop1', 'A great property', 'banana', True, False, False, None, ['apple', 'orange', 'banana', 'durian'], None)\n"; |
| |
| const auto executable_dir = minifi::utils::file::FileUtils::get_executable_dir(); |
| #ifdef WIN32 |
| std::filesystem::create_symlink(executable_dir / "minifi-python-script-extension.dll", python_dir / "minifi_native.pyd"); |
| #endif |
| std::filesystem::copy(executable_dir / "resources" / "minifi-python" / "nifiapi", python_dir / "nifiapi", std::filesystem::copy_options::recursive); |
| utils::file::create_dir(python_dir / "nifi_python_processors"); |
| std::ofstream{python_dir / "nifi_python_processors" / "MyPyProc3.py"} << R"( |
| from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult |
| from nifiapi.properties import ExpressionLanguageScope, PropertyDescriptor, StandardValidators |
| |
| class MyPyProc3(FlowFileTransform): |
| |
| class Java: |
| implements = ['org.apache.nifi.python.processor.FlowFileTransform'] |
| |
| class ProcessorDetails: |
| version = '1.2.3' |
| description = "Test processor number three." |
| dependencies = [] |
| |
| COLOR = PropertyDescriptor( |
| name="Color", |
| description="Symbolic name for the combination of frequencies of electromagnetic radiation reflected by the processor.", |
| allowable_values=['red', 'blue', 'green', 'purple'], |
| default_value='red', |
| required=True, |
| expression_language_scope=ExpressionLanguageScope.NONE |
| ) |
| |
| MOOD = PropertyDescriptor( |
| name="Mood", |
| description="The mental or emotional state of the processor.", |
| required=False, |
| validators=[StandardValidators.NON_EMPTY_VALIDATOR], |
| expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES |
| ) |
| |
| def __init__(self, **kwargs): |
| pass |
| |
| def getPropertyDescriptors(self): |
| return [self.COLOR, self.MOOD] |
| |
| def getDynamicPropertyDescriptor(self, propertyname): |
| return PropertyDescriptor(name=propertyname, |
| description="A user-defined property", |
| dynamic=True) |
| |
| def transform(self, context, flow_file): |
| color = context.getProperty(self.COLOR).getValue() |
| mood = context.getProperty(self.MOOD).evaluateAttributeExpressions(flowfile).getValue() or "OK" |
| user = flow_file.getContentsAsBytes().decode('utf-8') |
| response = f"Hello {user}! I am a {color} processor. I am feeling {mood}." |
| return FlowFileTransformResult('success', contents=response.encode('utf-8')) |
| )"; |
| |
| std::ofstream{python_dir / "nifi_python_processors" / "MyPyProc4.py"} << R"( |
| from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult |
| |
| class MyPyProc4(FlowFileTransform): |
| |
| class Java: |
| implements = ['org.apache.nifi.python.processor.FlowFileTransform'] |
| |
| class ProcessorDetails: |
| description = "Test processor number four. Does not define a version." |
| dependencies = [] |
| |
| def __init__(self, **kwargs): |
| pass |
| |
| def transform(self, context, flow_file): |
| return FlowFileTransformResult('success') |
| )"; |
| |
| std::ofstream{python_dir / "nifi_python_processors" / "MyPyProc5.py"} << R"( |
| from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult |
| |
| class MyPyProc5(FlowFileTransform): |
| |
| class Java: |
| implements = ['org.apache.nifi.python.processor.FlowFileTransform'] |
| |
| class ProcessorDetails: |
| version = '' |
| description = "Test processor number five. Defines a version, but it is blank." |
| dependencies = [] |
| |
| def __init__(self, **kwargs): |
| pass |
| |
| def transform(self, context, flow_file): |
| return FlowFileTransformResult('success') |
| )"; |
| |
| controller.configuration_->set(minifi::Configuration::nifi_python_processor_dir, python_dir.string()); |
| #ifndef WIN32 |
| controller.configuration_->set(minifi::Configuration::nifi_extension_path, "*minifi-python-lib-loader*, *minifi-python-script*"); |
| #else |
| controller.configuration_->set(minifi::Configuration::nifi_extension_path, "*minifi-python-script*"); |
| #endif |
| |
| core::extension::ExtensionManager extension_manager(controller.configuration_); |
| |
| controller.setupFlow(); |
| |
| auto c2_metrics_publisher = std::dynamic_pointer_cast<minifi::c2::C2MetricsPublisher>(controller.metrics_publisher_store_->getMetricsPublisher(minifi::c2::C2_METRICS_PUBLISHER).lock()); |
| |
| auto agent_info = c2_metrics_publisher->getAgentManifest(); |
| |
| auto& manifest = getNode(agent_info.serialized_nodes, "agentManifest"); |
| |
| auto findPythonBundle = [&](const std::string& name) { |
| // each python file gets its own bundle |
| auto* python_bundle = findNode(manifest.children, [&](const auto& child) { |
| return child.name == "bundles" && findNode(child.children, [&](const auto& bundle_child) { |
| return bundle_child.name == "artifact" && bundle_child.value == name + ".py"; |
| }); |
| }); |
| REQUIRE(python_bundle); |
| return gsl::make_not_null(python_bundle); |
| }; |
| |
| auto getProcessorNode = [&] (gsl::not_null<const SerializedResponseNode*> bundle) { |
| auto& processors = getNode(getNode(bundle->children, "componentManifest").children, "processors"); |
| REQUIRE(processors.children.size() == 1); |
| auto& only_child = processors.children[0]; |
| return gsl::make_not_null(&only_child); |
| }; |
| |
| { |
| auto python_bundle = findPythonBundle("MyPyProc"); |
| auto MyPyProc = getProcessorNode(python_bundle); |
| |
| CHECK(getNode(MyPyProc->children, "inputRequirement").value == "INPUT_ALLOWED"); |
| CHECK(getNode(MyPyProc->children, "isSingleThreaded").value == true); |
| CHECK(getNode(MyPyProc->children, "typeDescription").value == "An amazing processor"); |
| CHECK(getNode(MyPyProc->children, "supportsDynamicRelationships").value == false); |
| CHECK(getNode(MyPyProc->children, "supportsDynamicProperties").value == false); |
| CHECK(getNode(MyPyProc->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc"); |
| |
| auto& rels = getNode(MyPyProc->children, "supportedRelationships").children; |
| REQUIRE(rels.size() == 3); |
| |
| auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";}); |
| REQUIRE(success); |
| CHECK(getNode(success->children, "description").value == "Script succeeds"); |
| |
| auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";}); |
| REQUIRE(failure); |
| REQUIRE(getNode(failure->children, "description").value == "Script fails"); |
| } |
| |
| { |
| auto python_bundle = findPythonBundle("MyPyProc2"); |
| auto MyPyProc2 = getProcessorNode(python_bundle); |
| |
| CHECK(getNode(MyPyProc2->children, "inputRequirement").value == "INPUT_ALLOWED"); |
| CHECK(getNode(MyPyProc2->children, "isSingleThreaded").value == true); |
| CHECK(getNode(MyPyProc2->children, "typeDescription").value == "Another amazing processor"); |
| CHECK(getNode(MyPyProc2->children, "supportsDynamicRelationships").value == false); |
| CHECK(getNode(MyPyProc2->children, "supportsDynamicProperties").value == true); |
| CHECK(getNode(MyPyProc2->children, "type").value == "org.apache.nifi.minifi.processors.MyPyProc2"); |
| |
| auto& properties = getNode(MyPyProc2->children, "propertyDescriptors").children; |
| REQUIRE(properties.size() == 1); |
| CHECK(properties[0].name == "Prop1"); |
| CHECK(getNode(properties[0].children, "name").value == "Prop1"); |
| CHECK(getNode(properties[0].children, "required").value == true); |
| CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE"); |
| CHECK(getNode(properties[0].children, "defaultValue").value == "banana"); |
| auto& allowable_values = getNode(properties[0].children, "allowableValues"); |
| REQUIRE(allowable_values.children.size() == 4); |
| CHECK(getNthAllowableValue(allowable_values, 0) == "apple"); |
| CHECK(getNthAllowableValue(allowable_values, 1) == "orange"); |
| CHECK(getNthAllowableValue(allowable_values, 2) == "banana"); |
| CHECK(getNthAllowableValue(allowable_values, 3) == "durian"); |
| |
| auto& rels = getNode(MyPyProc2->children, "supportedRelationships").children; |
| REQUIRE(rels.size() == 3); |
| |
| auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";}); |
| REQUIRE(success); |
| CHECK(getNode(success->children, "description").value == "Script succeeds"); |
| |
| auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";}); |
| REQUIRE(failure); |
| CHECK(getNode(failure->children, "description").value == "Script fails"); |
| } |
| |
| { |
| auto python_bundle = findPythonBundle("MyPyProc3"); |
| auto MyPyProc3 = getProcessorNode(python_bundle); |
| |
| CHECK(getNode(python_bundle->children, "version").value == "1.2.3"); |
| |
| CHECK(getNode(MyPyProc3->children, "inputRequirement").value == "INPUT_ALLOWED"); |
| CHECK(getNode(MyPyProc3->children, "isSingleThreaded").value == true); |
| CHECK(getNode(MyPyProc3->children, "typeDescription").value == "Test processor number three."); |
| CHECK(getNode(MyPyProc3->children, "supportsDynamicRelationships").value == false); |
| CHECK(getNode(MyPyProc3->children, "supportsDynamicProperties").value == true); |
| CHECK(getNode(MyPyProc3->children, "type").value == "org.apache.nifi.minifi.processors.nifi_python_processors.MyPyProc3"); |
| |
| auto& properties = getNode(MyPyProc3->children, "propertyDescriptors").children; |
| REQUIRE(properties.size() == 2); |
| CHECK(properties[0].name == "Color"); |
| CHECK(getNode(properties[0].children, "name").value == "Color"); |
| CHECK(getNode(properties[0].children, "required").value == true); |
| CHECK(getNode(properties[0].children, "expressionLanguageScope").value == "NONE"); |
| CHECK(getNode(properties[0].children, "defaultValue").value == "red"); |
| CHECK(properties[1].name == "Mood"); |
| CHECK(getNode(properties[1].children, "name").value == "Mood"); |
| CHECK(getNode(properties[1].children, "required").value == false); |
| CHECK(getNode(properties[1].children, "expressionLanguageScope").value == "FLOWFILE_ATTRIBUTES"); |
| |
| auto& rels = getNode(MyPyProc3->children, "supportedRelationships").children; |
| REQUIRE(rels.size() == 3); |
| |
| auto* success = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "success";}); |
| REQUIRE(success); |
| CHECK(getNode(success->children, "description").value == "Script succeeds"); |
| |
| auto* failure = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "failure";}); |
| REQUIRE(failure); |
| CHECK(getNode(failure->children, "description").value == "Script fails"); |
| |
| auto* original = findNode(rels, [] (auto& rel) {return getNode(rel.children, "name").value == "original";}); |
| REQUIRE(original); |
| CHECK(getNode(original->children, "description").value == "Original flow file"); |
| } |
| |
| { |
| auto python_bundle = findPythonBundle("MyPyProc4"); |
| CHECK(getNode(python_bundle->children, "version").value == minifi::AgentBuild::VERSION); |
| } |
| |
| { |
| auto python_bundle = findPythonBundle("MyPyProc5"); |
| CHECK(getNode(python_bundle->children, "version").value == minifi::AgentBuild::VERSION); |
| } |
| } |