blob: 06793c26c32894e8f6eb9fa3037c7b60f56831f8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .containers.MinifiContainer import MinifiContainer
import logging
import tarfile
import docker
from io import BytesIO
from textwrap import dedent
import os
class PythonWithDependenciesOptions:
REQUIREMENTS_FILE = 0
SYSTEM_INSTALLED_PACKAGES = 1
INLINE_DEFINED_PACKAGES = 2
class ImageStore:
def __init__(self):
self.client = docker.from_env()
self.images = dict()
self.test_dir = os.environ['TEST_DIRECTORY'] # Based on DockerVerify.sh
def cleanup(self):
# Clean up images
for image in self.images.values():
logging.info('Cleaning up image: %s', image.id)
self.client.images.remove(image.id, force=True)
def get_image(self, container_engine):
if container_engine in self.images:
return self.images[container_engine]
if container_engine == "minifi-cpp-sql":
image = self.__build_minifi_cpp_sql_image()
elif container_engine == "minifi-cpp-with-example-python-processors":
image = self.__build_minifi_cpp_image_with_example_minifi_python_processors()
elif container_engine == "minifi-cpp-nifi-python":
image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.REQUIREMENTS_FILE)
elif container_engine == "minifi-cpp-nifi-python-system-python-packages":
image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES)
elif container_engine == "minifi-cpp-nifi-with-inline-python-dependencies":
image = self.__build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES)
elif container_engine == "minifi-cpp-nifi-with-python-without-dependencies":
image = self.__build_minifi_cpp_image_with_nifi_python_processors()
elif container_engine == "minifi-cpp-with-llamacpp-model":
image = self.__build_minifi_cpp_image_with_llamacpp_model()
elif container_engine == "http-proxy":
image = self.__build_http_proxy_image()
elif container_engine == "postgresql-server":
image = self.__build_postgresql_server_image()
elif container_engine == "mqtt-broker":
image = self.__build_mqtt_broker_image()
elif container_engine == "splunk":
image = self.__build_splunk_image()
elif container_engine == "kinesis-server":
image = self.__build_kinesis_image()
elif container_engine == "reverse-proxy":
image = self.__build_reverse_proxy_image()
elif container_engine == "diag-slave-tcp":
image = self.__build_diagslave_image()
else:
raise Exception("There is no associated image for " + container_engine)
self.images[container_engine] = image
return image
def __build_minifi_cpp_sql_image(self):
if "rocky" in MinifiContainer.MINIFI_TAG_PREFIX:
install_sql_cmd = "dnf -y install postgresql-odbc"
so_location = "psqlodbca.so"
elif "bullseye" in MinifiContainer.MINIFI_TAG_PREFIX or "bookworm" in MinifiContainer.MINIFI_TAG_PREFIX:
install_sql_cmd = "apt -y install odbc-postgresql"
so_location = "/usr/lib/$(gcc -dumpmachine)/odbc/psqlodbca.so"
elif "jammy" in MinifiContainer.MINIFI_TAG_PREFIX or "noble" in MinifiContainer.MINIFI_TAG_PREFIX:
install_sql_cmd = "apt -y install odbc-postgresql"
so_location = "/usr/lib/$(gcc -dumpmachine)/odbc/psqlodbca.so"
else:
install_sql_cmd = "apk --update --no-cache add psqlodbc"
so_location = "psqlodbca.so"
dockerfile = dedent("""\
FROM {base_image}
USER root
RUN {install_sql_cmd}
RUN echo "[PostgreSQL ANSI]" > /odbcinst.ini.template && \
echo "Description=PostgreSQL ODBC driver (ANSI version)" >> /odbcinst.ini.template && \
echo "Driver={so_location}" >> /odbcinst.ini.template && \
echo "Setup=libodbcpsqlS.so" >> /odbcinst.ini.template && \
echo "Debug=0" >> /odbcinst.ini.template && \
echo "CommLog=1" >> /odbcinst.ini.template && \
echo "UsageCount=1" >> /odbcinst.ini.template && \
echo "" >> /odbcinst.ini.template && \
echo "[PostgreSQL Unicode]" >> /odbcinst.ini.template && \
echo "Description=PostgreSQL ODBC driver (Unicode version)" >> /odbcinst.ini.template && \
echo "Driver=psqlodbcw.so" >> /odbcinst.ini.template && \
echo "Setup=libodbcpsqlS.so" >> /odbcinst.ini.template && \
echo "Debug=0" >> /odbcinst.ini.template && \
echo "CommLog=1" >> /odbcinst.ini.template && \
echo "UsageCount=1" >> /odbcinst.ini.template
RUN odbcinst -i -d -f /odbcinst.ini.template
RUN echo "[ODBC]" > /etc/odbc.ini && \
echo "Driver = PostgreSQL ANSI" >> /etc/odbc.ini && \
echo "Description = PostgreSQL Data Source" >> /etc/odbc.ini && \
echo "Servername = postgres" >> /etc/odbc.ini && \
echo "Port = 5432" >> /etc/odbc.ini && \
echo "Protocol = 8.4" >> /etc/odbc.ini && \
echo "UserName = postgres" >> /etc/odbc.ini && \
echo "Password = password" >> /etc/odbc.ini && \
echo "Database = postgres" >> /etc/odbc.ini
USER minificpp
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
install_sql_cmd=install_sql_cmd, so_location=so_location))
return self.__build_image(dockerfile)
def __build_minifi_cpp_image_with_example_minifi_python_processors(self):
dockerfile = dedent("""\
FROM {base_image}
RUN cp -r {minifi_python_examples_dir} {minifi_python_dir}/examples
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path,
minifi_python_examples_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_examples))
return self.__build_image(dockerfile)
def __build_minifi_cpp_image_with_nifi_python_processors_using_dependencies(self, python_option):
parse_document_url = "https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ParseDocument.py"
chunk_document_url = "https://raw.githubusercontent.com/apache/nifi-python-extensions/refs/heads/main/src/extensions/chunking/ChunkDocument.py"
pip3_install_command = ""
requirements_install_command = ""
additional_cmd = ""
# The following sed command is used to remove the existing dependencies from the ParseDocument and ChunkDocument processors
# /class ProcessorDetails:/,/^$/: Do the following between 'class ProcessorDetails:' and the first empty line (so we don't modify other PropertyDescriptor blocks below)
# /^\s*dependencies\s*=/,/\]\s*$/: Do the following between 'dependencies =' at the start of a line, and ']' at the end of a line
# d: Delete line
parse_document_sed_cmd = 'sed -i "/class ProcessorDetails:/,/^$/{{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}}" {minifi_python_dir}/nifi_python_processors/ParseDocument.py && \\'.format(minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path)
chunk_document_sed_cmd = 'sed -i "/class ProcessorDetails:/,/^$/{{/^\\s*dependencies\\s*=/,/\\]\\s*$/d}}" {minifi_python_dir}/nifi_python_processors/ChunkDocument.py && \\'.format(minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path)
if python_option == PythonWithDependenciesOptions.SYSTEM_INSTALLED_PACKAGES:
if not MinifiContainer.MINIFI_TAG_PREFIX or "bookworm" in MinifiContainer.MINIFI_TAG_PREFIX or "noble" in MinifiContainer.MINIFI_TAG_PREFIX or "trixie" in MinifiContainer.MINIFI_TAG_PREFIX:
additional_cmd = "RUN pip3 install --break-system-packages 'langchain<=0.17.0'"
else:
additional_cmd = "RUN pip3 install 'langchain<=0.17.0'"
elif python_option == PythonWithDependenciesOptions.REQUIREMENTS_FILE:
requirements_install_command = "echo 'langchain<=0.17.0' > {minifi_python_dir}/nifi_python_processors/requirements.txt && \\".format(minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path)
elif python_option == PythonWithDependenciesOptions.INLINE_DEFINED_PACKAGES:
parse_document_sed_cmd = parse_document_sed_cmd[:-2] + ' sed -i "s/langchain==[0-9.]\\+/langchain<=0.17.0/" {minifi_python_dir}/nifi_python_processors/ParseDocument.py && \\'.format(minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path)
chunk_document_sed_cmd = 'sed -i "s/\\[\\\'langchain\\\'\\]/\\[\\\'langchain<=0.17.0\\\'\\]/" {minifi_python_dir}/nifi_python_processors/ChunkDocument.py && \\'.format(minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path)
if not MinifiContainer.MINIFI_TAG_PREFIX:
pip3_install_command = "RUN apk --update --no-cache add py3-pip"
dockerfile = dedent("""\
FROM {base_image}
USER root
{pip3_install_command}
{additional_cmd}
USER minificpp
RUN wget {parse_document_url} --directory-prefix={minifi_python_dir}/nifi_python_processors && \\
wget {chunk_document_url} --directory-prefix={minifi_python_dir}/nifi_python_processors && \\
echo 'langchain<=0.17.0' > {minifi_python_dir}/nifi_python_processors/requirements.txt && \\
{requirements_install_command}
{parse_document_sed_cmd}
{chunk_document_sed_cmd}
python3 -m venv {minifi_python_venv_parent}/venv && \\
python3 -m venv {minifi_python_venv_parent}/venv-with-langchain && \\
. {minifi_python_venv_parent}/venv-with-langchain/bin/activate && python3 -m pip install --no-cache-dir "langchain<=0.17.0" && \\
deactivate
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
pip3_install_command=pip3_install_command,
parse_document_url=parse_document_url,
chunk_document_url=chunk_document_url,
additional_cmd=additional_cmd,
requirements_install_command=requirements_install_command,
parse_document_sed_cmd=parse_document_sed_cmd,
chunk_document_sed_cmd=chunk_document_sed_cmd,
minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path,
minifi_python_venv_parent=MinifiContainer.MINIFI_LOCATIONS.minifi_python_venv_parent))
return self.__build_image(dockerfile)
def __build_minifi_cpp_image_with_nifi_python_processors(self):
pip3_install_command = ""
if not MinifiContainer.MINIFI_TAG_PREFIX:
pip3_install_command = "RUN apk --update --no-cache add py3-pip"
dockerfile = dedent("""\
FROM {base_image}
USER root
{pip3_install_command}
USER minificpp
COPY RotatingForwarder.py {minifi_python_dir}/nifi_python_processors/RotatingForwarder.py
COPY SpecialPropertyTypeChecker.py {minifi_python_dir}/nifi_python_processors/SpecialPropertyTypeChecker.py
COPY ProcessContextInterfaceChecker.py {minifi_python_dir}/nifi_python_processors/ProcessContextInterfaceChecker.py
COPY CreateFlowFile.py {minifi_python_dir}/nifi_python_processors/CreateFlowFile.py
COPY FailureWithAttributes.py {minifi_python_dir}/nifi_python_processors/FailureWithAttributes.py
COPY subtractutils.py {minifi_python_dir}/nifi_python_processors/compute/subtractutils.py
COPY RelativeImporterProcessor.py {minifi_python_dir}/nifi_python_processors/compute/processors/RelativeImporterProcessor.py
COPY multiplierutils.py {minifi_python_dir}/nifi_python_processors/compute/processors/multiplierutils.py
COPY CreateNothing.py {minifi_python_dir}/nifi_python_processors/CreateNothing.py
COPY FailureWithContent.py {minifi_python_dir}/nifi_python_processors/FailureWithContent.py
COPY TransferToOriginal.py {minifi_python_dir}/nifi_python_processors/TransferToOriginal.py
COPY SetRecordField.py {minifi_python_dir}/nifi_python_processors/SetRecordField.py
COPY TestStateManager.py {minifi_python_dir}/nifi_python_processors/TestStateManager.py
COPY NifiStyleLogDynamicProperties.py {minifi_python_dir}/nifi_python_processors/NifiStyleLogDynamicProperties.py
COPY LogDynamicProperties.py {minifi_python_dir}/LogDynamicProperties.py
COPY ExpressionLanguagePropertyWithValidator.py {minifi_python_dir}/nifi_python_processors/ExpressionLanguagePropertyWithValidator.py
COPY EvaluateExpressionLanguageChecker.py {minifi_python_dir}/nifi_python_processors/EvaluateExpressionLanguageChecker.py
RUN python3 -m venv {minifi_python_venv_parent}/venv
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
pip3_install_command=pip3_install_command,
minifi_python_dir=MinifiContainer.MINIFI_LOCATIONS.minifi_python_dir_path,
minifi_python_venv_parent=MinifiContainer.MINIFI_LOCATIONS.minifi_python_venv_parent))
def build_full_python_resource_path(resource):
return os.path.join(self.test_dir, "resources", "python", resource)
return self.__build_image(dockerfile, [
build_full_python_resource_path("RotatingForwarder.py"),
build_full_python_resource_path("SpecialPropertyTypeChecker.py"),
build_full_python_resource_path("ProcessContextInterfaceChecker.py"),
build_full_python_resource_path("CreateFlowFile.py"),
build_full_python_resource_path("FailureWithAttributes.py"),
build_full_python_resource_path("RelativeImporterProcessor.py"),
build_full_python_resource_path("subtractutils.py"),
build_full_python_resource_path("multiplierutils.py"),
build_full_python_resource_path("CreateNothing.py"),
build_full_python_resource_path("FailureWithContent.py"),
build_full_python_resource_path("TransferToOriginal.py"),
build_full_python_resource_path("SetRecordField.py"),
build_full_python_resource_path("TestStateManager.py"),
build_full_python_resource_path("NifiStyleLogDynamicProperties.py"),
build_full_python_resource_path("LogDynamicProperties.py"),
build_full_python_resource_path("ExpressionLanguagePropertyWithValidator.py"),
build_full_python_resource_path("EvaluateExpressionLanguageChecker.py")
])
def __build_minifi_cpp_image_with_llamacpp_model(self):
dockerfile = dedent("""\
FROM {base_image}
RUN mkdir {models_path} && wget https://huggingface.co/bartowski/Qwen2-0.5B-Instruct-GGUF/resolve/main/Qwen2-0.5B-Instruct-IQ3_M.gguf --directory-prefix={models_path}
""".format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
models_path=MinifiContainer.MINIFI_LOCATIONS.models_path))
return self.__build_image(dockerfile)
def __build_http_proxy_image(self):
dockerfile = dedent("""\
FROM {base_image}
RUN apt -y update && apt install -y apache2-utils
RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
RUN echo 'auth_param basic program /usr/lib/squid/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \
echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \
echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \
echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \
echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
""".format(base_image='ubuntu/squid:5.2-22.04_beta', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
return self.__build_image(dockerfile)
def __build_postgresql_server_image(self):
dockerfile = dedent("""\
FROM {base_image}
RUN mkdir -p /docker-entrypoint-initdb.d
RUN echo "#!/bin/bash" > /docker-entrypoint-initdb.d/init-user-db.sh && \
echo "set -e" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo "psql -v ON_ERROR_STOP=1 --username "postgres" --dbname "postgres" <<-EOSQL" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " CREATE TABLE test_table (int_col INTEGER, text_col TEXT);" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " INSERT INTO test_table (int_col, text_col) VALUES (1, 'apple');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " INSERT INTO test_table (int_col, text_col) VALUES (2, 'banana');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " INSERT INTO test_table (int_col, text_col) VALUES (3, 'pear');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " CREATE TABLE test_table2 (int_col INTEGER, \\"tExT_Col\\" TEXT);" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " INSERT INTO test_table2 (int_col, \\"tExT_Col\\") VALUES (5, 'ApPlE');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo " INSERT INTO test_table2 (int_col, \\"tExT_Col\\") VALUES (6, 'BaNaNa');" >> /docker-entrypoint-initdb.d/init-user-db.sh && \
echo "EOSQL" >> /docker-entrypoint-initdb.d/init-user-db.sh
""".format(base_image='postgres:17.4'))
return self.__build_image(dockerfile)
def __build_mqtt_broker_image(self):
dockerfile = dedent("""\
FROM {base_image}
RUN echo 'log_dest stderr' >> /mosquitto-no-auth.conf
CMD ["/usr/sbin/mosquitto", "--verbose", "--config-file", "/mosquitto-no-auth.conf"]
""".format(base_image='eclipse-mosquitto:2.0.14'))
return self.__build_image(dockerfile)
def __build_kinesis_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server')
def __build_splunk_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk')
def __build_reverse_proxy_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/reverse-proxy", 'reverse-proxy')
def __build_diagslave_image(self):
return self.__build_image_by_path(self.test_dir + "/resources/diagslave", 'diag-slave-tcp')
def __build_image(self, dockerfile, context_files=[]):
conf_dockerfile_buffer = BytesIO()
docker_context_buffer = BytesIO()
try:
# Overlay conf onto base nifi image
conf_dockerfile_buffer.write(dockerfile.encode())
conf_dockerfile_buffer.seek(0)
with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context:
dockerfile_info = tarfile.TarInfo('Dockerfile')
dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes
docker_context.addfile(dockerfile_info,
fileobj=conf_dockerfile_buffer)
for context_file_path in context_files:
with open(context_file_path, 'rb') as file:
file_info = tarfile.TarInfo(os.path.basename(context_file_path))
file_info.size = os.path.getsize(context_file_path)
docker_context.addfile(file_info, file)
docker_context_buffer.seek(0)
logging.info('Creating configured image...')
image = self.client.images.build(fileobj=docker_context_buffer,
custom_context=True,
rm=True,
forcerm=True)
logging.info('Created image with id: %s', image[0].id)
finally:
conf_dockerfile_buffer.close()
docker_context_buffer.close()
return image[0]
def __build_image_by_path(self, dir, name=None):
try:
logging.info('Creating configured image...')
image = self.client.images.build(path=dir,
tag=name,
rm=True,
forcerm=True)
logging.info('Created image with id: %s', image[0].id)
return image[0]
except Exception as e:
logging.info(e)
raise
def get_minifi_image_python_version(self):
result = self.client.containers.run(
image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
command=['python3', '-c', 'import platform; print(platform.python_version())'],
remove=True
)
python_ver_str = result.decode('utf-8')
logging.info('MiNiFi python version: %s', python_ver_str)
return tuple(map(int, python_ver_str.split('.')))
def is_conda_available_in_minifi_image(self):
container = self.client.containers.create(
image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION,
command=['conda', '--version'],
)
try:
container.start()
result = container.logs()
container.remove(force=True)
except docker.errors.APIError:
container.remove(force=True)
return False
return result.decode('utf-8').startswith('conda ')