blob: 0b485940bd26b18cb051358d7bb1316fe487f938 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import datetime
import sys
import shortuuid
import os
import platform
sys.path.append('../minifi')
from MiNiFi_integration_test_driver import MiNiFi_integration_test # noqa: E402
from minifi import * # noqa
from cluster.ImageStore import ImageStore # noqa
from cluster.DockerTestDirectoryBindings import DockerTestDirectoryBindings # noqa
from cluster.KubernetesProxy import KubernetesProxy # noqa
def inject_feature_id(context, step):
if "${feature_id}" in step.name:
step.name = step.name.replace("${feature_id}", context.feature_id)
if step.table:
for row in step.table:
for i in range(len(row.cells)):
if "${feature_id}" in row.cells[i]:
row.cells[i] = row.cells[i].replace("${feature_id}", context.feature_id)
def before_scenario(context, scenario):
if "skip" in scenario.effective_tags:
scenario.skip("Marked with @skip")
return
logging.info("Integration test setup at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
context.test = MiNiFi_integration_test(context=context, feature_id=context.feature_id)
if "USE_NIFI_PYTHON_PROCESSORS_WITH_LANGCHAIN" in scenario.effective_tags:
if not context.image_store.is_conda_available_in_minifi_image() and context.image_store.get_minifi_image_python_version() < (3, 8, 1):
scenario.skip("NiFi Python processor tests use langchain library which requires Python 3.8.1 or later.")
return
for step in scenario.steps:
inject_feature_id(context, step)
def after_scenario(context, scenario):
if "skip" in scenario.effective_tags:
logging.info("Scenario was skipped, no need for clean up.")
return
logging.info("Integration test teardown at {time:%H:%M:%S.%f}".format(time=datetime.datetime.now()))
context.test.cleanup()
context.directory_bindings.cleanup_io()
if context.kubernetes_proxy:
context.kubernetes_proxy.delete_pods()
def before_all(context):
context.config.setup_logging()
context.image_store = ImageStore()
context.kubernetes_proxy = None
def before_feature(context, feature):
if "x86_x64_only" in feature.tags:
is_x86 = platform.machine() in ("i386", "AMD64", "x86_64")
if not is_x86:
feature.skip("This feature is only x86/x64 compatible")
if "SKIP_RPM" in feature.tags and "rpm" in os.environ['MINIFI_TAG_PREFIX']:
feature.skip("This feature is not yet supported on RPM installed images")
feature_id = shortuuid.uuid()
context.feature_id = feature_id
context.directory_bindings = DockerTestDirectoryBindings(feature_id)
context.directory_bindings.create_new_data_directories()
context.directory_bindings.create_cert_files()
context.root_ca_cert = context.directory_bindings.root_ca_cert
context.root_ca_key = context.directory_bindings.root_ca_key
if "requires.kubernetes.cluster" in feature.tags:
context.kubernetes_proxy = KubernetesProxy(
context.directory_bindings.get_data_directories()["kubernetes_temp_dir"],
os.path.join(os.environ['TEST_DIRECTORY'], 'resources', 'kubernetes', 'pods-etc'))
context.kubernetes_proxy.create_config(context.directory_bindings.get_directory_bindings())
context.kubernetes_proxy.start_cluster()
def after_feature(context, feature):
if "requires.kubernetes.cluster" in feature.tags and context.kubernetes_proxy:
context.kubernetes_proxy.cleanup()
context.kubernetes_proxy = None