blob: dbe8e05518f86103ecfc3184a5a200f3571a54cd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import subprocess
import sys
import pytest
# We should set these before loading _any_ of the rest of airflow so that the
# unit test mode config is set as early as possible.
tests_directory = os.path.dirname(os.path.realpath(__file__))
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(tests_directory, "dags")
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AWS_DEFAULT_REGION"] = (os.environ.get("AWS_DEFAULT_REGION") or "us-east-1")
@pytest.fixture()
def reset_environment():
"""
Resets env variables.
"""
init_env = os.environ.copy()
yield
changed_env = os.environ
for key in changed_env:
if key not in init_env:
del os.environ[key]
else:
os.environ[key] = init_env[key]
@pytest.fixture()
def reset_db():
"""
Resets Airflow db.
"""
from airflow.utils import db
db.resetdb()
yield
def pytest_addoption(parser):
"""
Add options parser for custom plugins
"""
group = parser.getgroup("airflow")
group.addoption(
"--with-db-init",
action="store_true",
dest="db_init",
help="Forces database initialization before tests",
)
group.addoption(
"--integrations",
action="store",
metavar="INTEGRATIONS",
help="only run tests matching comma separated integrations: "
"[cassandra,mongo,openldap,rabbitmq,redis]. "
"Use 'all' to select all integrations.",
)
group.addoption(
"--backend",
action="store",
metavar="BACKEND",
help="only run tests matching the backend: [sqlite,postgres,mysql].",
)
group.addoption(
"--runtime",
action="store",
metavar="RUNTIME",
help="only run tests matching the runtime: [kubernetes].",
)
@pytest.fixture(autouse=True, scope="session")
def breeze_test_helper(request):
"""
Helper that setups Airflow testing environment. It does the same thing
as the old 'run-tests' script.
"""
# fixme: this should use some other env variable ex. RUNNING_ON_K8S
if os.environ.get("SKIP_INIT_DB"):
print("Skipping db initialization. Tests do not require database")
return
print(" AIRFLOW ".center(60, "="))
# Setup test environment for breeze
home = os.path.expanduser("~")
airflow_home = os.environ.get("AIRFLOW_HOME") or os.path.join(home, "airflow")
print(f"Home of the user: {home}\nAirflow home {airflow_home}")
from airflow.utils import db
# Initialize Airflow db if required
lock_file = os.path.join(airflow_home, ".airflow_db_initialised")
if request.config.option.db_init:
print("Initializing the DB - forced with --with-db-init switch.")
try:
db.initdb()
except: # pylint: disable=bare-except # noqa
print("Skipping db initialization because database already exists.")
db.resetdb()
elif not os.path.exists(lock_file):
print(
"Initializing the DB - first time after entering the container.\n"
"You can force re-initialization the database by adding --with-db-init switch to run-tests."
)
try:
db.initdb()
except: # pylint: disable=bare-except # noqa
print("Skipping db initialization because database already exists.")
db.resetdb()
# Create pid file
with open(lock_file, "w+"):
pass
else:
print(
"Skipping initializing of the DB as it was initialized already.\n"
"You can re-initialize the database by adding --with-db-init flag when running tests."
)
integration_kerberos = os.environ.get("INTEGRATION_KERBEROS")
if integration_kerberos == "true":
# Initialize kerberos
kerberos = os.environ.get("KRB5_KTNAME")
if kerberos:
subprocess.check_call(["kinit", "-kt", kerberos, "airflow"])
else:
print("Kerberos enabled! Please setup KRB5_KTNAME environment variable")
sys.exit(1)
def pytest_configure(config):
config.addinivalue_line(
"markers", "integration(name): mark test to run with named integration"
)
config.addinivalue_line(
"markers", "backend(name): mark test to run with named backend"
)
config.addinivalue_line(
"markers", "runtime(name): mark test to run with named runtime"
)
def skip_if_not_marked_with_integration(selected_integrations, item):
for marker in item.iter_markers(name="integration"):
integration_name = marker.args[0]
if integration_name in selected_integrations or "all" in selected_integrations:
return
pytest.skip("The test is skipped because it does not have the right integration marker. "
"Only tests marked with pytest.mark.integration(INTEGRATION) are run with INTEGRATION"
" being one of {}. {item}".
format(selected_integrations, item=item))
def skip_if_not_marked_with_backend(selected_backend, item):
for marker in item.iter_markers(name="backend"):
backend_names = marker.args
if selected_backend in backend_names:
return
pytest.skip("The test is skipped because it does not have the right backend marker "
"Only tests marked with pytest.mark.backend('{}') are run"
": {item}".
format(selected_backend, item=item))
def skip_if_not_marked_with_runtime(selected_runtime, item):
for marker in item.iter_markers(name="runtime"):
runtime_name = marker.args[0]
if runtime_name == selected_runtime:
return
pytest.skip("The test is skipped because it has not been selected via --runtime switch. "
"Only tests marked with pytest.mark.runtime('{}') are run: {item}".
format(selected_runtime, item=item))
def skip_if_integration_disabled(marker, item):
integration_name = marker.args[0]
environment_variable_name = "INTEGRATION_" + integration_name.upper()
environment_variable_value = os.environ.get(environment_variable_name)
if not environment_variable_value or environment_variable_value != "true":
pytest.skip("The test requires {integration_name} integration started and "
"{} environment variable to be set to true (it is '{}')."
" It can be set by specifying '--integration {integration_name}' at breeze startup"
": {item}".
format(environment_variable_name, environment_variable_value,
integration_name=integration_name, item=item))
def skip_if_runtime_disabled(marker, item):
runtime_name = marker.args[0]
environment_variable_name = "RUNTIME"
environment_variable_value = os.environ.get(environment_variable_name)
if not environment_variable_value or environment_variable_value != runtime_name:
pytest.skip("The test requires {runtime_name} integration started and "
"{} environment variable to be set to true (it is '{}')."
" It can be set by specifying '--environment {runtime_name}' at breeze startup"
": {item}".
format(environment_variable_name, environment_variable_value,
runtime_name=runtime_name, item=item))
def skip_if_wrong_backend(marker, item):
valid_backend_names = marker.args
environment_variable_name = "BACKEND"
environment_variable_value = os.environ.get(environment_variable_name)
if not environment_variable_value or environment_variable_value not in valid_backend_names:
pytest.skip("The test requires one of {valid_backend_names} backend started and "
"{} environment variable to be set to true (it is '{}')."
" It can be set by specifying backend at breeze startup"
": {item}".
format(environment_variable_name, environment_variable_value,
valid_backend_names=valid_backend_names, item=item))
def pytest_runtest_setup(item):
selected_integrations = item.config.getoption("--integrations")
selected_integrations_list = selected_integrations.split(",") if selected_integrations else []
for marker in item.iter_markers(name="integration"):
skip_if_integration_disabled(marker, item)
if selected_integrations_list:
skip_if_not_marked_with_integration(selected_integrations, item)
for marker in item.iter_markers(name="backend"):
skip_if_wrong_backend(marker, item)
selected_backend = item.config.getoption("--backend")
if selected_backend:
skip_if_not_marked_with_backend(selected_backend, item)
for marker in item.iter_markers(name="runtime"):
skip_if_runtime_disabled(marker, item)
selected_runtime = item.config.getoption("--runtime")
if selected_runtime:
skip_if_not_marked_with_runtime(selected_runtime, item)