This readme defines the configuration parameters to use ExecutePythonProcessor to run native python processors.
This extension targets the 3.6 stable python API, this means it will work with any(≥3.6) python library.
yum install python3-libs
apt install libpython3-dev
Debian/Ubuntu doesn't provide the generic libpython3.so, but the extension works with the specific libraries as well. To use the extension on a system where the generic libpython3.so is not available, we must patch the library to use the specific library.
e.g. This will change the dependency from the generic libpython3.so to the specific libpython3.9.so
patchelf extensions/libminifi-python-script-extension.so --replace-needed libpython3.so libpython3.9.so
Just make sure minifi finds the anaconda libraries. e.g.:
export LD_LIBRARY_PATH="${CONDA_PREFIX}/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}"
Just make sure minifi finds the pyenv libraries. e.g.:
export LD_LIBRARY_PATH="${PYENV_ROOT}/versions/${PY_VERSION}/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}"
Python native processors can be updated at any time by simply adding a new processor to the directory defined in the configuration options. The processor name, when provided to MiNiFi C++ and any C2 manifest will be that of the name of the python script. For example, “AttributePrinter.py” will be named and referenced in the flow as “org.apache.nifi.minifi.processors.AttributePrinter”
Methods that are enabled within the processor are describe, onSchedule, onInitialize, and onTrigger.
Describe is passed the processor and is a required function. You must set the description like so:
def describe(processor): processor.setDescription("Adds an attribute to your flow files")
onInitialize is also passed the processor reference and can be where you set properties. The first argument is the property display name, followed by the description, and default value. The last two arguments are booleans describing if the property is required or requires EL.
def onInitialize(processor): processor.setSupportsDynamicProperties() processor.addProperty("property name","description","default value", True, False)
The onSchedule function is passed the context and session factory. This should be where your processor loads and reads properties via the getProperty function. onTrigger is executed and passed the processor context and session. You may keep state within your processor.
Much like C++ processors, callbacks may be defined for reading/writing streams of data through the session. Those callback classes will have a process function that accepts the input stream. You may use codecs getReader to read that data as in the example, below, from VaderSentiment
class VaderSentiment(object): def __init__(self): self.content = None def process(self, input_stream): self.content = codecs.getreader('utf-8')(input_stream).read() return len(self.content)
To enable python Processor capabilities, the following options need to be provided in minifi.properties. The directory specified can contain processors. Note that the processor name will be the reference in your flow. Directories are treated like package names. Therefore if the nifi.python.processor.dir is /tmp/ and you have a subdirectory named packagedir with the file name file.py, it will produce a processor with the name org.apache.nifi.minifi.processors.packagedir.file. Note that each subdirectory will append a package to the reference class name.
in minifi.properties #directory where processors exist nifi.python.processor.dir=XXXX
The python directory (extensions/pythonprocessors) contains implementations that will be available for flows if the required dependencies exist.
The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This requires that you install nltk and VaderSentiment pip install nltk pip install VaderSentiment