blob: 966f9d81bcb276b93e0b5b27fdf7863402ec3b6f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@ENABLE_PYTHON_SCRIPTING
Feature: MiNiFi can use python processors in its flows
Background:
Given the content of "/tmp/output" is monitored
Scenario: A MiNiFi instance can update attributes through custom python processor
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a ExecutePythonProcessor processor with the "Script File" property set to "/tmp/resources/python/add_attribute_to_flowfile.py"
And a LogAttribute processor
And the "success" relationship of the GenerateFlowFile processor is connected to the ExecutePythonProcessor
And the "success" relationship of the ExecutePythonProcessor processor is connected to the LogAttribute
When all instances start up
Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
Scenario: A MiNiFi instance can update attributes through native python processor
Given the example MiNiFi python processors are present
And a GenerateFlowFile processor with the "File Size" property set to "0B"
And a AddPythonAttribute processor
And a LogAttribute processor
And the "success" relationship of the GenerateFlowFile processor is connected to the AddPythonAttribute
And the "success" relationship of the AddPythonAttribute processor is connected to the LogAttribute
When all instances start up
Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds
Scenario: A MiNiFi instance can handle dynamic properties through native python processor
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a LogDynamicProperties processor with the "Static Property" property set to "static value"
And the "Dynamic Property" property of the LogDynamicProperties processor is set to "dynamic value"
And the "success" relationship of the GenerateFlowFile processor is connected to the LogDynamicProperties
And python processors without dependencies are present on the MiNiFi agent
When all instances start up
Then the Minifi logs contain the following message: "Static Property value: static value" in less than 60 seconds
And the Minifi logs contain the following message: "Dynamic Property value: dynamic value" in less than 60 seconds
And the Minifi logs contain the following message: "dynamic property key count: 1" in less than 60 seconds
And the Minifi logs contain the following message: "dynamic property key: Dynamic Property" in less than 60 seconds
Scenario: Native python processor can read empty input stream
Given the example MiNiFi python processors are present
And a GenerateFlowFile processor with the "File Size" property set to "0B"
And a MoveContentToJson processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GenerateFlowFile processor is connected to the MoveContentToJson
And the "success" relationship of the MoveContentToJson processor is connected to the PutFile
When all instances start up
Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds
Scenario: FlowFile can be removed from session
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a RemoveFlowFile processor
When all instances start up
Then the Minifi logs contain the following message: "Removing flow file with UUID" in less than 30 seconds
Scenario: Native python processors can be stateful
Given the example MiNiFi python processors are present
And a CountingProcessor processor
And the scheduling period of the CountingProcessor processor is set to "100 ms"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the CountingProcessor processor is connected to the PutFile
When all instances start up
Then flowfiles with these contents are placed in the monitored directory in less than 5 seconds: "0,1,2,3,4,5"
@USE_NIFI_PYTHON_PROCESSORS_WITH_LANGCHAIN
Scenario Outline: MiNiFi C++ can use native NiFi python processors
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with filename "test_file.log" and content "test_data" is present in "/tmp/input"
And a ParseDocument processor
And a ChunkDocument processor with the "Chunk Size" property set to "5"
And the "Chunk Overlap" property of the ChunkDocument processor is set to "3"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And python with langchain is installed on the MiNiFi agent <python_install_mode>
And the "success" relationship of the GetFile processor is connected to the ParseDocument
And the "success" relationship of the ParseDocument processor is connected to the ChunkDocument
And the "success" relationship of the ChunkDocument processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute
When all instances start up
Then at least one flowfile's content match the following regex: '{"text": "test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 0, "chunk_count": 3}}' in less than 30 seconds
And the Minifi logs contain the following message: "key:document.count value:3" in less than 10 seconds
Examples: Different python installation modes
| python_install_mode |
| with required python packages |
| with a pre-created virtualenv |
| with a pre-created virtualenv containing the required python packages |
| using inline defined Python dependencies to install packages |
Scenario: MiNiFi C++ can use native NiFi source python processors
Given a CreateFlowFile processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And the "space" relationship of the CreateFlowFile processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute
And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then a flowfile with the content "Hello World!" is placed in the monitored directory in less than 10 seconds
And the Minifi logs contain the following message: "key:filename value:" in less than 60 seconds
And the Minifi logs contain the following message: "key:type value:space" in less than 60 seconds
Scenario: MiNiFi C++ can use custom relationships in NiFi native python processors
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with filename "test_file.log" and content "test_data_one" is present in "/tmp/input"
And a file with filename "test_file2.log" and content "test_data_two" is present in "/tmp/input"
And a file with filename "test_file3.log" and content "test_data_three" is present in "/tmp/input"
And a file with filename "test_file4.log" and content "test_data_four" is present in "/tmp/input"
And a RotatingForwarder processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GetFile processor is connected to the RotatingForwarder
And the "first" relationship of the RotatingForwarder processor is connected to the PutFile
And the "second" relationship of the RotatingForwarder processor is connected to the PutFile
And the "third" relationship of the RotatingForwarder processor is connected to the PutFile
And the "fourth" relationship of the RotatingForwarder processor is connected to the PutFile
When all instances start up
Then flowfiles with these contents are placed in the monitored directory in less than 10 seconds: "test_data_one,test_data_two,test_data_three,test_data_four"
Scenario: MiNiFi C++ can use special property types including controller services in NiFi native python processors
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a SpecialPropertyTypeChecker processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And python processors without dependencies are present on the MiNiFi agent
And a SSL context service is set up for the following processor: "SpecialPropertyTypeChecker"
And the "success" relationship of the GenerateFlowFile processor is connected to the SpecialPropertyTypeChecker
And the "success" relationship of the SpecialPropertyTypeChecker processor is connected to the PutFile
When all instances start up
Then one flowfile with the contents "Check successful!" is placed in the monitored directory in less than 30 seconds
Scenario: NiFi native python processor's ProcessContext interface can be used in MiNiFi C++
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a ProcessContextInterfaceChecker processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is connected to the ProcessContextInterfaceChecker
And the "myrelationship" relationship of the ProcessContextInterfaceChecker processor is connected to the PutFile
When all instances start up
Then one flowfile with the contents "Check successful!" is placed in the monitored directory in less than 30 seconds
Scenario: NiFi native python processor can update attributes of a flow file transferred to failure relationship
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a UpdateAttribute processor with the "my.attribute" property set to "my.value"
And the "error.message" property of the UpdateAttribute processor is set to "Old error"
And a FailureWithAttributes processor
And a LogAttribute processor
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is connected to the FailureWithAttributes
And the "failure" relationship of the FailureWithAttributes processor is connected to the LogAttribute
When all instances start up
Then the Minifi logs contain the following message: "key:error.message value:Error" in less than 60 seconds
And the Minifi logs contain the following message: "key:my.attribute value:my.value" in less than 10 seconds
Scenario: NiFi native python processors support relative imports
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a RelativeImporterProcessor processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is connected to the RelativeImporterProcessor
And the "success" relationship of the RelativeImporterProcessor processor is connected to the PutFile
When all instances start up
Then one flowfile with the contents "The final result is 1990" is placed in the monitored directory in less than 30 seconds
Scenario: NiFi native python processor is allowed to be triggered without creating any flow files
Given a CreateNothing processor
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the CreateNothing processor is connected to the PutFile
And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then no files are placed in the monitored directory in 10 seconds of running time
And the Minifi logs do not contain the following message: "Caught Exception during SchedulingAgent::onTrigger of processor CreateNothing" after 1 seconds
Scenario: NiFi native python processor cannot specify content of failure result
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a FailureWithContent processor
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is connected to the FailureWithContent
When all instances start up
Then the Minifi logs contain the following message: "'failure' relationship should not have content, the original flow file will be transferred automatically in this case." in less than 60 seconds
Scenario: NiFi native python processor cannot transfer to original relationship
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a TransferToOriginal processor
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GenerateFlowFile processor is connected to the TransferToOriginal
When all instances start up
Then the Minifi logs contain the following message: "Result relationship cannot be 'original', it is reserved for the original flow file, and transferred automatically in non-failure cases." in less than 60 seconds
Scenario: MiNiFi C++ supports RecordTransform native python processors
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"group": "group1", "name": "John"}\n{"group": "group1", "name": "Jane"}\n{"group": "group2", "name": "Kyle"}\n{"name": "Zoe"}' is present in '/tmp/input'
And a file with the content '{"group": "group1", "name": "Steve"}\n{}' is present in '/tmp/input'
And a SetRecordField processor with the "Record Reader" property set to "JsonTreeReader"
And the "Record Writer" property of the SetRecordField processor is set to "JsonRecordSetWriter"
And a JsonTreeReader controller service is set up
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And the "Log Payload" property of the LogAttribute processor is set to "true"
And python processors without dependencies are present on the MiNiFi agent
And the "success" relationship of the GetFile processor is connected to the SetRecordField
And the "success" relationship of the SetRecordField processor is connected to the LogAttribute
When all instances start up
Then the Minifi logs contain the following message: '[{"group":"group1","name":"John"},{"group":"group1","name":"Jane"}]' in less than 60 seconds
And the Minifi logs contain the following message: '[{"group":"group2","name":"Kyle"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{"name":"Zoe"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{"group":"group1","name":"Steve"}]' in less than 5 seconds
And the Minifi logs contain the following message: '[{}]' in less than 5 seconds
Scenario: MiNiFi C++ can use state manager commands in native NiFi python processors
Given a TestStateManager processor
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And the "success" relationship of the TestStateManager processor is connected to the LogAttribute
And python processors without dependencies are present on the MiNiFi agent
When the MiNiFi instance starts up
Then the Minifi logs contain the following message: "key:state_key value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:state_key value:2" in less than 60 seconds
Scenario: MiNiFi C++ can use dynamic properties in native NiFi python processors
Given a GenerateFlowFile processor with the "File Size" property set to "0B"
And a NifiStyleLogDynamicProperties processor with the "Static Property" property set to "static value"
And the "Dynamic Property" property of the NifiStyleLogDynamicProperties processor is set to "dynamic value"
And the "success" relationship of the GenerateFlowFile processor is connected to the NifiStyleLogDynamicProperties
And python processors without dependencies are present on the MiNiFi agent
When all instances start up
Then the Minifi logs contain the following message: "Static Property value: static value" in less than 60 seconds
And the Minifi logs contain the following message: "Dynamic Property value: dynamic value" in less than 60 seconds