blob: 90aa7959ea6a82968fa18bbb21bca3ea18c5203b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import copy
import datetime
import io
import os
import re
import tempfile
import textwrap
import warnings
from collections import OrderedDict
from unittest import mock
import pytest
from pytest import param
from airflow import configuration
from airflow.configuration import (
AirflowConfigException,
AirflowConfigParser,
conf,
default_config_yaml,
expand_env_var,
get_airflow_config,
get_airflow_home,
parameterized_config,
run_command,
)
from tests.test_utils.config import conf_vars
from tests.test_utils.reset_warning_registry import reset_warning_registry
from tests.utils.test_config import (
remove_all_configurations,
set_deprecated_options,
set_sensitive_config_values,
use_config,
)
HOME_DIR = os.path.expanduser("~")
@pytest.fixture(scope="module", autouse=True)
def restore_env():
with mock.patch.dict("os.environ"):
yield
@mock.patch.dict(
"os.environ",
{
"AIRFLOW__TESTSECTION__TESTKEY": "testvalue",
"AIRFLOW__CORE__FERNET_KEY": "testvalue",
"AIRFLOW__TESTSECTION__TESTPERCENT": "with%percent",
"AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD": 'echo -n "OK"',
"AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD": 'echo -n "NOT OK"',
},
)
class TestConf:
def test_airflow_home_default(self):
with mock.patch.dict("os.environ"):
if "AIRFLOW_HOME" in os.environ:
del os.environ["AIRFLOW_HOME"]
assert get_airflow_home() == expand_env_var("~/airflow")
def test_airflow_home_override(self):
with mock.patch.dict("os.environ", AIRFLOW_HOME="/path/to/airflow"):
assert get_airflow_home() == "/path/to/airflow"
def test_airflow_config_default(self):
with mock.patch.dict("os.environ"):
if "AIRFLOW_CONFIG" in os.environ:
del os.environ["AIRFLOW_CONFIG"]
assert get_airflow_config("/home/airflow") == expand_env_var("/home/airflow/airflow.cfg")
def test_airflow_config_override(self):
with mock.patch.dict("os.environ", AIRFLOW_CONFIG="/path/to/airflow/airflow.cfg"):
assert get_airflow_config("/home//airflow") == "/path/to/airflow/airflow.cfg"
@conf_vars({("core", "percent"): "with%%inside"})
def test_case_sensitivity(self):
# section and key are case insensitive for get method
# note: this is not the case for as_dict method
assert conf.get("core", "percent") == "with%inside"
assert conf.get("core", "PERCENT") == "with%inside"
assert conf.get("CORE", "PERCENT") == "with%inside"
def test_config_as_dict(self):
"""Test that getting config as dict works even if
environment has non-legal env vars"""
with mock.patch.dict("os.environ"):
os.environ["AIRFLOW__VAR__broken"] = "not_ok"
asdict = conf.as_dict(raw=True, display_sensitive=True)
assert asdict.get("VAR") is None
assert asdict["testsection"]["testkey"] == "testvalue"
def test_env_var_config(self):
opt = conf.get("testsection", "testkey")
assert opt == "testvalue"
opt = conf.get("testsection", "testpercent")
assert opt == "with%percent"
assert conf.has_option("testsection", "testkey")
with mock.patch.dict(
"os.environ", AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__TESTSECTION__TESTKEY="nested"
):
opt = conf.get("kubernetes_environment_variables", "AIRFLOW__TESTSECTION__TESTKEY")
assert opt == "nested"
@mock.patch.dict(
"os.environ", AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__TESTSECTION__TESTKEY="nested"
)
@conf_vars({("core", "percent"): "with%%inside"})
def test_conf_as_dict(self):
cfg_dict = conf.as_dict()
# test that configs are picked up
assert cfg_dict["core"]["unit_test_mode"] == "True"
assert cfg_dict["core"]["percent"] == "with%inside"
# test env vars
assert cfg_dict["testsection"]["testkey"] == "testvalue"
assert cfg_dict["kubernetes_environment_variables"]["AIRFLOW__TESTSECTION__TESTKEY"] == "nested"
def test_conf_as_dict_source(self):
# test display_source
cfg_dict = conf.as_dict(display_source=True)
assert cfg_dict["core"]["load_examples"][1] == "airflow.cfg"
assert cfg_dict["database"]["load_default_connections"][1] == "airflow.cfg"
assert cfg_dict["testsection"]["testkey"] == ("testvalue", "env var")
assert cfg_dict["core"]["fernet_key"] == ("< hidden >", "env var")
def test_conf_as_dict_sensitive(self):
# test display_sensitive
cfg_dict = conf.as_dict(display_sensitive=True)
assert cfg_dict["testsection"]["testkey"] == "testvalue"
assert cfg_dict["testsection"]["testpercent"] == "with%percent"
# test display_source and display_sensitive
cfg_dict = conf.as_dict(display_sensitive=True, display_source=True)
assert cfg_dict["testsection"]["testkey"] == ("testvalue", "env var")
@conf_vars({("core", "percent"): "with%%inside"})
def test_conf_as_dict_raw(self):
# test display_sensitive
cfg_dict = conf.as_dict(raw=True, display_sensitive=True)
assert cfg_dict["testsection"]["testkey"] == "testvalue"
# Values with '%' in them should be escaped
assert cfg_dict["testsection"]["testpercent"] == "with%%percent"
assert cfg_dict["core"]["percent"] == "with%%inside"
def test_conf_as_dict_exclude_env(self):
# test display_sensitive
cfg_dict = conf.as_dict(include_env=False, display_sensitive=True)
# Since testsection is only created from env vars, it shouldn't be
# present at all if we don't ask for env vars to be included.
assert "testsection" not in cfg_dict
def test_command_precedence(self):
test_config = """[test]
key1 = hello
key2_cmd = printf cmd_result
key3 = airflow
key4_cmd = printf key4_result
"""
test_config_default = """[test]
key1 = awesome
key2 = airflow
[another]
key6 = value6
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
("test", "key2"),
("test", "key4"),
}
assert "hello" == test_conf.get("test", "key1")
assert "cmd_result" == test_conf.get("test", "key2")
assert "airflow" == test_conf.get("test", "key3")
assert "key4_result" == test_conf.get("test", "key4")
assert "value6" == test_conf.get("another", "key6")
assert "hello" == test_conf.get("test", "key1", fallback="fb")
assert "value6" == test_conf.get("another", "key6", fallback="fb")
assert "fb" == test_conf.get("another", "key7", fallback="fb")
assert test_conf.getboolean("another", "key8_boolean", fallback="True") is True
assert 10 == test_conf.getint("another", "key8_int", fallback="10")
assert 1.0 == test_conf.getfloat("another", "key8_float", fallback="1")
assert test_conf.has_option("test", "key1")
assert test_conf.has_option("test", "key2")
assert test_conf.has_option("test", "key3")
assert test_conf.has_option("test", "key4")
assert not test_conf.has_option("test", "key5")
assert test_conf.has_option("another", "key6")
cfg_dict = test_conf.as_dict(display_sensitive=True)
assert "cmd_result" == cfg_dict["test"]["key2"]
assert "key2_cmd" not in cfg_dict["test"]
# If we exclude _cmds then we should still see the commands to run, not
# their values
cfg_dict = test_conf.as_dict(include_cmds=False, display_sensitive=True)
assert "key4" not in cfg_dict["test"]
assert "printf key4_result" == cfg_dict["test"]["key4_cmd"]
def test_can_read_dot_section(self):
test_config = """[test.abc]
key1 = true
"""
test_conf = AirflowConfigParser()
test_conf.read_string(test_config)
section = "test.abc"
key = "key1"
assert test_conf.getboolean(section, key) is True
with mock.patch.dict(
"os.environ",
{
"AIRFLOW__TEST_ABC__KEY1": "false", # note that the '.' is converted to '_'
},
):
assert test_conf.getboolean(section, key) is False
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
@conf_vars(
{
("secrets", "backend"): "airflow.providers.hashicorp.secrets.vault.VaultBackend",
("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
}
)
def test_config_from_secret_backend(self, mock_hvac):
"""Get Config Value from a Secret Backend"""
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v2.read_secret_version.return_value = {
"request_id": "2d48a2ad-6bcb-e5b6-429d-da35fdf31f56",
"lease_id": "",
"renewable": False,
"lease_duration": 0,
"data": {
"data": {"value": "sqlite:////Users/airflow/airflow/airflow.db"},
"metadata": {
"created_time": "2020-03-28T02:10:54.301784Z",
"deletion_time": "",
"destroyed": False,
"version": 1,
},
},
"wrap_info": None,
"warnings": None,
"auth": None,
}
test_config = """[test]
sql_alchemy_conn_secret = sql_alchemy_conn
"""
test_config_default = """[test]
sql_alchemy_conn = airflow
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
("test", "sql_alchemy_conn"),
}
assert "sqlite:////Users/airflow/airflow/airflow.db" == test_conf.get("test", "sql_alchemy_conn")
def test_hidding_of_sensitive_config_values(self):
test_config = """[test]
sql_alchemy_conn_secret = sql_alchemy_conn
"""
test_config_default = """[test]
sql_alchemy_conn = airflow
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
("test", "sql_alchemy_conn"),
}
assert "airflow" == test_conf.get("test", "sql_alchemy_conn")
# Hide sensitive fields
asdict = test_conf.as_dict(display_sensitive=False)
assert "< hidden >" == asdict["test"]["sql_alchemy_conn"]
# If display_sensitive is false, then include_cmd, include_env,include_secrets must all be True
# This ensures that cmd and secrets env are hidden at the appropriate method and no surprises
with pytest.raises(ValueError):
test_conf.as_dict(display_sensitive=False, include_cmds=False)
# Test that one of include_cmds, include_env, include_secret can be false when display_sensitive
# is True
assert test_conf.as_dict(display_sensitive=True, include_cmds=False)
@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
@conf_vars(
{
("secrets", "backend"): "airflow.providers.hashicorp.secrets.vault.VaultBackend",
("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
}
)
def test_config_raise_exception_from_secret_backend_connection_error(self, mock_hvac):
"""Get Config Value from a Secret Backend"""
mock_client = mock.MagicMock()
# mock_client.side_effect = AirflowConfigException
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v2.read_secret_version.return_value = Exception
test_config = """[test]
sql_alchemy_conn_secret = sql_alchemy_conn
"""
test_config_default = """[test]
sql_alchemy_conn = airflow
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
("test", "sql_alchemy_conn"),
}
with pytest.raises(
AirflowConfigException,
match=re.escape(
"Cannot retrieve config from alternative secrets backend. "
"Make sure it is configured properly and that the Backend "
"is accessible."
),
):
test_conf.get("test", "sql_alchemy_conn")
def test_getboolean(self):
"""Test AirflowConfigParser.getboolean"""
test_config = """
[type_validation]
key1 = non_bool_value
[true]
key2 = t
key3 = true
key4 = 1
[false]
key5 = f
key6 = false
key7 = 0
[inline-comment]
key8 = true #123
"""
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to bool. Please check "key1" key in "type_validation" section. '
'Current value: "non_bool_value".'
),
):
test_conf.getboolean("type_validation", "key1")
assert isinstance(test_conf.getboolean("true", "key3"), bool)
assert test_conf.getboolean("true", "key2") is True
assert test_conf.getboolean("true", "key3") is True
assert test_conf.getboolean("true", "key4") is True
assert test_conf.getboolean("false", "key5") is False
assert test_conf.getboolean("false", "key6") is False
assert test_conf.getboolean("false", "key7") is False
assert test_conf.getboolean("inline-comment", "key8") is True
def test_getint(self):
"""Test AirflowConfigParser.getint"""
test_config = """
[invalid]
key1 = str
[valid]
key2 = 1
"""
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key1" key in "invalid" section. '
'Current value: "str".'
),
):
test_conf.getint("invalid", "key1")
assert isinstance(test_conf.getint("valid", "key2"), int)
assert 1 == test_conf.getint("valid", "key2")
def test_getfloat(self):
"""Test AirflowConfigParser.getfloat"""
test_config = """
[invalid]
key1 = str
[valid]
key2 = 1.23
"""
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to float. Please check "key1" key in "invalid" section. '
'Current value: "str".'
),
):
test_conf.getfloat("invalid", "key1")
assert isinstance(test_conf.getfloat("valid", "key2"), float)
assert 1.23 == test_conf.getfloat("valid", "key2")
@pytest.mark.parametrize(
("config_str", "expected"),
[
pytest.param('{"a": 123}', {"a": 123}, id="dict"),
pytest.param("[1,2,3]", [1, 2, 3], id="list"),
pytest.param('"abc"', "abc", id="str"),
pytest.param("2.1", 2.1, id="num"),
pytest.param("", None, id="empty"),
],
)
def test_getjson(self, config_str, expected):
config = textwrap.dedent(
f"""
[test]
json = {config_str}
"""
)
test_conf = AirflowConfigParser()
test_conf.read_string(config)
assert test_conf.getjson("test", "json") == expected
def test_getjson_empty_with_fallback(self):
config = textwrap.dedent(
"""
[test]
json =
"""
)
test_conf = AirflowConfigParser()
test_conf.read_string(config)
assert test_conf.getjson("test", "json", fallback={}) == {}
assert test_conf.getjson("test", "json") is None
@pytest.mark.parametrize(
("fallback"),
[
pytest.param({"a": "b"}, id="dict"),
# fallback is _NOT_ json parsed, but used verbatim
pytest.param('{"a": "b"}', id="str"),
pytest.param(None, id="None"),
],
)
def test_getjson_fallback(self, fallback):
test_conf = AirflowConfigParser()
assert test_conf.getjson("test", "json", fallback=fallback) == fallback
def test_has_option(self):
test_config = """[test]
key1 = value1
"""
test_conf = AirflowConfigParser()
test_conf.read_string(test_config)
assert test_conf.has_option("test", "key1")
assert not test_conf.has_option("test", "key_not_exists")
assert not test_conf.has_option("section_not_exists", "key1")
def test_remove_option(self):
test_config = """[test]
key1 = hello
key2 = airflow
"""
test_config_default = """[test]
key1 = awesome
key2 = airflow
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
assert "hello" == test_conf.get("test", "key1")
test_conf.remove_option("test", "key1", remove_default=False)
assert "awesome" == test_conf.get("test", "key1")
test_conf.remove_option("test", "key2")
assert not test_conf.has_option("test", "key2")
def test_getsection(self):
test_config = """
[test]
key1 = hello
[new_section]
key = value
"""
test_config_default = """
[test]
key1 = awesome
key2 = airflow
[testsection]
key3 = value3
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
assert OrderedDict([("key1", "hello"), ("key2", "airflow")]) == test_conf.getsection("test")
assert OrderedDict(
[("key3", "value3"), ("testkey", "testvalue"), ("testpercent", "with%percent")]
) == test_conf.getsection("testsection")
assert OrderedDict([("key", "value")]) == test_conf.getsection("new_section")
assert test_conf.getsection("non_existent_section") is None
def test_get_section_should_respect_cmd_env_variable(self):
with tempfile.NamedTemporaryFile(delete=False) as cmd_file:
cmd_file.write(b"#!/usr/bin/env bash\n")
cmd_file.write(b"echo -n difficult_unpredictable_cat_password\n")
cmd_file.flush()
os.chmod(cmd_file.name, 0o0555)
cmd_file.close()
with mock.patch.dict("os.environ", {"AIRFLOW__WEBSERVER__SECRET_KEY_CMD": cmd_file.name}):
content = conf.getsection("webserver")
os.unlink(cmd_file.name)
assert content["secret_key"] == "difficult_unpredictable_cat_password"
def test_kubernetes_environment_variables_section(self):
test_config = """
[kubernetes_environment_variables]
key1 = hello
AIRFLOW_HOME = /root/airflow
"""
test_config_default = """
[kubernetes_environment_variables]
"""
test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
test_conf.read_string(test_config)
assert OrderedDict([("key1", "hello"), ("AIRFLOW_HOME", "/root/airflow")]) == test_conf.getsection(
"kubernetes_environment_variables"
)
def test_broker_transport_options(self):
section_dict = conf.getsection("celery_broker_transport_options")
assert isinstance(section_dict["visibility_timeout"], int)
assert isinstance(section_dict["_test_only_bool"], bool)
assert isinstance(section_dict["_test_only_float"], float)
assert isinstance(section_dict["_test_only_string"], str)
def test_auth_backends_adds_session(self):
test_conf = AirflowConfigParser(default_config="")
# Guarantee we have deprecated settings, so we test the deprecation
# lookup even if we remove this explicit fallback
test_conf.deprecated_values = {
"api": {
"auth_backends": (
re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
"airflow.api.auth.backend.session",
"3.0",
),
},
}
test_conf.read_dict({"api": {"auth_backends": "airflow.api.auth.backend.basic_auth"}})
with pytest.warns(FutureWarning):
test_conf.validate()
assert (
test_conf.get("api", "auth_backends")
== "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
)
def test_command_from_env(self):
test_cmdenv_config = """[testcmdenv]
itsacommand = NOT OK
notacommand = OK
"""
test_cmdenv_conf = AirflowConfigParser()
test_cmdenv_conf.read_string(test_cmdenv_config)
test_cmdenv_conf.sensitive_config_values.add(("testcmdenv", "itsacommand"))
with mock.patch.dict("os.environ"):
# AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD maps to ('testcmdenv', 'itsacommand') in
# sensitive_config_values and therefore should return 'OK' from the environment variable's
# echo command, and must not return 'NOT OK' from the configuration
assert test_cmdenv_conf.get("testcmdenv", "itsacommand") == "OK"
# AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in sensitive_config_values and therefore
# the option should return 'OK' from the configuration, and must not return 'NOT OK' from
# the environment variable's echo command
assert test_cmdenv_conf.get("testcmdenv", "notacommand") == "OK"
@pytest.mark.parametrize("display_sensitive, result", [(True, "OK"), (False, "< hidden >")])
def test_as_dict_display_sensitivewith_command_from_env(self, display_sensitive, result):
test_cmdenv_conf = AirflowConfigParser()
test_cmdenv_conf.sensitive_config_values.add(("testcmdenv", "itsacommand"))
with mock.patch.dict("os.environ"):
asdict = test_cmdenv_conf.as_dict(True, display_sensitive)
assert asdict["testcmdenv"]["itsacommand"] == (result, "cmd")
def test_parameterized_config_gen(self):
config = textwrap.dedent(
"""
[core]
dags_folder = {AIRFLOW_HOME}/dags
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
parallelism = 32
fernet_key = {FERNET_KEY}
"""
)
cfg = parameterized_config(config)
# making sure some basic building blocks are present:
assert "[core]" in cfg
assert "dags_folder" in cfg
assert "sql_alchemy_conn" in cfg
assert "fernet_key" in cfg
# making sure replacement actually happened
assert "{AIRFLOW_HOME}" not in cfg
assert "{FERNET_KEY}" not in cfg
def test_config_use_original_when_original_and_fallback_are_present(self):
assert conf.has_option("core", "FERNET_KEY")
assert not conf.has_option("core", "FERNET_KEY_CMD")
fernet_key = conf.get("core", "FERNET_KEY")
with conf_vars({("core", "FERNET_KEY_CMD"): "printf HELLO"}):
fallback_fernet_key = conf.get("core", "FERNET_KEY")
assert fernet_key == fallback_fernet_key
def test_config_throw_error_when_original_and_fallback_is_absent(self):
assert conf.has_option("core", "FERNET_KEY")
assert not conf.has_option("core", "FERNET_KEY_CMD")
with conf_vars({("core", "fernet_key"): None}):
with pytest.raises(AirflowConfigException) as ctx:
conf.get("core", "FERNET_KEY")
exception = str(ctx.value)
message = "section/key [core/fernet_key] not found in config"
assert message == exception
def test_config_override_original_when_non_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = "some value"
with mock.patch.dict("os.environ", {key: value}):
fernet_key = conf.get("core", "FERNET_KEY")
assert value == fernet_key
def test_config_override_original_when_empty_envvar_is_provided(self):
key = "AIRFLOW__CORE__FERNET_KEY"
value = "some value"
with mock.patch.dict("os.environ", {key: value}):
fernet_key = conf.get("core", "FERNET_KEY")
assert value == fernet_key
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__DAGS_FOLDER": "/tmp/test_folder"})
def test_write_should_respect_env_variable(self):
with io.StringIO() as string_file:
conf.write(string_file)
content = string_file.getvalue()
assert "dags_folder = /tmp/test_folder" in content
def test_run_command(self):
write = r'sys.stdout.buffer.write("\u1000foo".encode("utf8"))'
cmd = f"import sys; {write}; sys.stdout.flush()"
assert run_command(f"python -c '{cmd}'") == "\u1000foo"
assert run_command('echo "foo bar"') == "foo bar\n"
with pytest.raises(AirflowConfigException):
run_command('bash -c "exit 1"')
def test_confirm_unittest_mod(self):
assert conf.get("core", "unit_test_mode")
def test_enum_default_task_weight_rule_from_conf(self):
test_conf = AirflowConfigParser(default_config="")
test_conf.read_dict({"core": {"default_task_weight_rule": "sidestream"}})
with pytest.raises(AirflowConfigException) as ctx:
test_conf.validate()
exception = str(ctx.value)
message = (
"`[core] default_task_weight_rule` should not be 'sidestream'. Possible values: "
"absolute, downstream, upstream."
)
assert message == exception
def test_enum_logging_levels(self):
test_conf = AirflowConfigParser(default_config="")
test_conf.read_dict({"logging": {"logging_level": "XXX"}})
with pytest.raises(AirflowConfigException) as ctx:
test_conf.validate()
exception = str(ctx.value)
message = (
"`[logging] logging_level` should not be 'XXX'. Possible values: "
"CRITICAL, FATAL, ERROR, WARN, WARNING, INFO, DEBUG."
)
assert message == exception
def test_as_dict_works_without_sensitive_cmds(self):
conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
assert "sql_alchemy_conn" in conf_materialize_cmds["core"]
assert "sql_alchemy_conn_cmd" not in conf_materialize_cmds["core"]
assert "sql_alchemy_conn" in conf_maintain_cmds["core"]
assert "sql_alchemy_conn_cmd" not in conf_maintain_cmds["core"]
assert (
conf_materialize_cmds["core"]["sql_alchemy_conn"]
== conf_maintain_cmds["core"]["sql_alchemy_conn"]
)
def test_as_dict_respects_sensitive_cmds(self):
conf_conn = conf["database"]["sql_alchemy_conn"]
test_conf = copy.deepcopy(conf)
test_conf.read_string(
textwrap.dedent(
"""
[database]
sql_alchemy_conn_cmd = echo -n my-super-secret-conn
"""
)
)
conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
conf_maintain_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
assert "sql_alchemy_conn" in conf_materialize_cmds["database"]
assert "sql_alchemy_conn_cmd" not in conf_materialize_cmds["database"]
if conf_conn == test_conf.airflow_defaults["database"]["sql_alchemy_conn"]:
assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "my-super-secret-conn"
assert "sql_alchemy_conn_cmd" in conf_maintain_cmds["database"]
assert conf_maintain_cmds["database"]["sql_alchemy_conn_cmd"] == "echo -n my-super-secret-conn"
if conf_conn == test_conf.airflow_defaults["database"]["sql_alchemy_conn"]:
assert "sql_alchemy_conn" not in conf_maintain_cmds["database"]
else:
assert "sql_alchemy_conn" in conf_maintain_cmds["database"]
assert conf_maintain_cmds["database"]["sql_alchemy_conn"] == conf_conn
@mock.patch.dict(
"os.environ", {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True
)
def test_as_dict_respects_sensitive_cmds_from_env(self):
test_conf = copy.deepcopy(conf)
test_conf.read_string("")
conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
assert "sql_alchemy_conn" in conf_materialize_cmds["database"]
assert "sql_alchemy_conn_cmd" not in conf_materialize_cmds["database"]
assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "postgresql://"
def test_gettimedelta(self):
test_config = """
[invalid]
# non-integer value
key1 = str
# fractional value
key2 = 300.99
# too large value for C int
key3 = 999999999999999
[valid]
# negative value
key4 = -1
# zero
key5 = 0
# positive value
key6 = 300
[default]
# Equals to None
key7 =
"""
test_conf = AirflowConfigParser(default_config=test_config)
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key1" key in "invalid" section. '
'Current value: "str".'
),
):
test_conf.gettimedelta("invalid", "key1")
with pytest.raises(
AirflowConfigException,
match=re.escape(
'Failed to convert value to int. Please check "key2" key in "invalid" section. '
'Current value: "300.99".'
),
):
test_conf.gettimedelta("invalid", "key2")
with pytest.raises(
AirflowConfigException,
match=re.escape(
"Failed to convert value to timedelta in `seconds`. "
"Python int too large to convert to C int. "
'Please check "key3" key in "invalid" section. Current value: "999999999999999".'
),
):
test_conf.gettimedelta("invalid", "key3")
assert isinstance(test_conf.gettimedelta("valid", "key4"), datetime.timedelta)
assert test_conf.gettimedelta("valid", "key4") == datetime.timedelta(seconds=-1)
assert isinstance(test_conf.gettimedelta("valid", "key5"), datetime.timedelta)
assert test_conf.gettimedelta("valid", "key5") == datetime.timedelta(seconds=0)
assert isinstance(test_conf.gettimedelta("valid", "key6"), datetime.timedelta)
assert test_conf.gettimedelta("valid", "key6") == datetime.timedelta(seconds=300)
assert isinstance(test_conf.gettimedelta("default", "key7"), type(None))
assert test_conf.gettimedelta("default", "key7") is None
class TestDeprecatedConf:
@conf_vars(
{
("celery", "worker_concurrency"): None,
("celery", "celeryd_concurrency"): None,
}
)
def test_deprecated_options(self):
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
with set_deprecated_options(
deprecated_options={("celery", "worker_concurrency"): ("celery", "celeryd_concurrency", "2.0.0")}
):
# Remove it so we are sure we use the right setting
conf.remove_option("celery", "worker_concurrency")
with pytest.warns(DeprecationWarning):
with mock.patch.dict("os.environ", AIRFLOW__CELERY__CELERYD_CONCURRENCY="99"):
assert conf.getint("celery", "worker_concurrency") == 99
with pytest.warns(DeprecationWarning), conf_vars({("celery", "celeryd_concurrency"): "99"}):
assert conf.getint("celery", "worker_concurrency") == 99
@conf_vars(
{
("logging", "logging_level"): None,
("core", "logging_level"): None,
}
)
def test_deprecated_options_with_new_section(self):
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
with set_deprecated_options(
deprecated_options={("logging", "logging_level"): ("core", "logging_level", "2.0.0")}
):
# Remove it so we are sure we use the right setting
conf.remove_option("core", "logging_level")
conf.remove_option("logging", "logging_level")
with pytest.warns(DeprecationWarning):
with mock.patch.dict("os.environ", AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
assert conf.get("logging", "logging_level") == "VALUE"
with pytest.warns(FutureWarning, match="Please update your `conf.get"):
with mock.patch.dict("os.environ", AIRFLOW__CORE__LOGGING_LEVEL="VALUE"):
assert conf.get("core", "logging_level") == "VALUE"
with pytest.warns(DeprecationWarning), conf_vars({("core", "logging_level"): "VALUE"}):
assert conf.get("logging", "logging_level") == "VALUE"
@conf_vars(
{
("celery", "result_backend"): None,
("celery", "celery_result_backend"): None,
("celery", "celery_result_backend_cmd"): None,
}
)
def test_deprecated_options_cmd(self):
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
with set_deprecated_options(
deprecated_options={("celery", "result_backend"): ("celery", "celery_result_backend", "2.0.0")}
), set_sensitive_config_values(sensitive_config_values={("celery", "celery_result_backend")}):
conf.remove_option("celery", "result_backend")
with conf_vars({("celery", "celery_result_backend_cmd"): "/bin/echo 99"}):
with pytest.warns(DeprecationWarning):
tmp = None
if "AIRFLOW__CELERY__RESULT_BACKEND" in os.environ:
tmp = os.environ.pop("AIRFLOW__CELERY__RESULT_BACKEND")
assert conf.getint("celery", "result_backend") == 99
if tmp:
os.environ["AIRFLOW__CELERY__RESULT_BACKEND"] = tmp
def test_deprecated_values_from_conf(self):
test_conf = AirflowConfigParser(
default_config="""
[core]
executor=SequentialExecutor
[database]
sql_alchemy_conn=sqlite://test
"""
)
# Guarantee we have deprecated settings, so we test the deprecation
# lookup even if we remove this explicit fallback
test_conf.deprecated_values = {
"core": {"hostname_callable": (re.compile(r":"), r".", "2.1")},
}
test_conf.read_dict({"core": {"hostname_callable": "airflow.utils.net:getfqdn"}})
with pytest.warns(FutureWarning):
test_conf.validate()
assert test_conf.get("core", "hostname_callable") == "airflow.utils.net.getfqdn"
@pytest.mark.parametrize(
"old, new",
[
(
("api", "auth_backend", "airflow.api.auth.backend.basic_auth"),
(
"api",
"auth_backends",
"airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session",
),
),
(
("core", "sql_alchemy_conn", "postgres+psycopg2://localhost/postgres"),
("database", "sql_alchemy_conn", "postgresql://localhost/postgres"),
),
],
)
def test_deprecated_env_vars_upgraded_and_removed(self, old, new):
test_conf = AirflowConfigParser(
default_config="""
[core]
executor=SequentialExecutor
[database]
sql_alchemy_conn=sqlite://test
"""
)
old_section, old_key, old_value = old
new_section, new_key, new_value = new
old_env_var = test_conf._env_var_name(old_section, old_key)
new_env_var = test_conf._env_var_name(new_section, new_key)
with pytest.warns(FutureWarning):
with mock.patch.dict("os.environ", **{old_env_var: old_value}):
# Can't start with the new env var existing...
os.environ.pop(new_env_var, None)
test_conf.validate()
assert test_conf.get(new_section, new_key) == new_value
# We also need to make sure the deprecated env var is removed
# so that any subprocesses don't use it in place of our updated
# value.
assert old_env_var not in os.environ
# and make sure we track the old value as well, under the new section/key
assert test_conf.upgraded_values[(new_section, new_key)] == old_value
@pytest.mark.parametrize(
"conf_dict",
[
{}, # Even if the section is absent from config file, environ still needs replacing.
{"core": {"hostname_callable": "airflow.utils.net.getfqdn"}},
],
)
def test_deprecated_values_from_environ(self, conf_dict):
def make_config():
test_conf = AirflowConfigParser(
default_config="""
[core]
executor=SequentialExecutor
[database]
sql_alchemy_conn=sqlite://test
"""
)
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
test_conf.deprecated_values = {
"core": {"hostname_callable": (re.compile(r":"), r".", "2.1")},
}
test_conf.read_dict(conf_dict)
test_conf.validate()
return test_conf
with pytest.warns(FutureWarning):
with mock.patch.dict("os.environ", AIRFLOW__CORE__HOSTNAME_CALLABLE="airflow.utils.net:getfqdn"):
test_conf = make_config()
assert test_conf.get("core", "hostname_callable") == "airflow.utils.net.getfqdn"
with reset_warning_registry():
with warnings.catch_warnings(record=True) as warning:
with mock.patch.dict(
"os.environ",
AIRFLOW__CORE__HOSTNAME_CALLABLE="CarrierPigeon",
):
test_conf = make_config()
assert test_conf.get("core", "hostname_callable") == "CarrierPigeon"
assert [] == warning
@pytest.mark.parametrize(
("conf_dict", "environ", "expected"),
[
pytest.param({"old_section": {"val": "old_val"}}, None, "old_val", id="old_config"),
pytest.param(
{"old_section": {"val": "old_val"}},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="old_config_old_env",
),
pytest.param(
{},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="old_env",
),
pytest.param(
{"new_section": {"val": "val2"}},
("AIRFLOW__OLD_SECTION__VAL", "old_env"),
"old_env",
id="new_config_old_env",
),
],
)
def test_deprecated_sections(self, conf_dict, environ, expected, monkeypatch):
def make_config():
test_conf = AirflowConfigParser(
default_config=textwrap.dedent(
"""
[new_section]
val=new
"""
)
)
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
test_conf.deprecated_sections = {
"new_section": ("old_section", "2.1"),
}
test_conf.read_dict(conf_dict)
test_conf.validate()
return test_conf
if environ:
monkeypatch.setenv(*environ)
test_conf = make_config()
with pytest.warns(
DeprecationWarning,
match=r"\[old_section\] has been moved to the val option in \[new_section\].*update your config",
):
# Test when you've _set_ the old value that we warn you need to update your config
assert test_conf.get("new_section", "val") == expected
with pytest.warns(
FutureWarning,
match=r"\[old_section\] has been renamed to \[new_section\].*update your `conf.get",
):
# Test when you read using the old section you get told to change your `conf.get` call
assert test_conf.get("old_section", "val") == expected
def test_deprecated_funcs(self):
for func in [
"load_test_config",
"get",
"getboolean",
"getfloat",
"getint",
"has_option",
"remove_option",
"as_dict",
"set",
]:
with mock.patch(f"airflow.configuration.conf.{func}") as mock_method:
with pytest.warns(DeprecationWarning):
getattr(configuration, func)()
mock_method.assert_called_once()
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_config(self, display_source: bool):
with use_config(config="deprecated.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=False,
include_cmds=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("mysql://", "airflow.cfg") if display_source else "mysql://"
)
# database should be None because the deprecated value is set in config
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "mysql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_both_env_and_config(self, display_source: bool):
with use_config(config="deprecated.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_cmds=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("postgresql://", "env var") if display_source else "postgresql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "postgresql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_both_env_and_config_exclude_env(
self, display_source: bool
):
with use_config(config="deprecated.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=False,
include_cmds=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("mysql://", "airflow.cfg") if display_source else "mysql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "mysql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "postgresql://"}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_env(self, display_source: bool):
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source, raw=True, display_sensitive=True, include_env=True
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("postgresql://", "env var") if display_source else "postgresql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "postgresql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {}, clear=True)
def test_conf_as_dict_when_both_conf_and_env_are_empty(self, display_source: bool):
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(display_source=display_source, raw=True, display_sensitive=True)
assert cfg_dict["core"].get("sql_alchemy_conn") is None
# database should be taken from default because the deprecated value is missing in config
assert cfg_dict["database"].get("sql_alchemy_conn") == (
(f"sqlite:///{HOME_DIR}/airflow/airflow.db", "default")
if display_source
else f"sqlite:///{HOME_DIR}/airflow/airflow.db"
)
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == f"sqlite:///{HOME_DIR}/airflow/airflow.db"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_cmd_config(self, display_source: bool):
with use_config(config="deprecated_cmd.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_cmds=True,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("postgresql://", "cmd") if display_source else "postgresql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "postgresql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict(
"os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True
)
def test_conf_as_dict_when_deprecated_value_in_cmd_env(self, display_source: bool):
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_cmds=True,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("postgresql://", "cmd") if display_source else "postgresql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "postgresql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict(
"os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True
)
def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_env(self, display_source: bool):
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_cmds=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") is None
assert cfg_dict["database"].get("sql_alchemy_conn") == (
(f"sqlite:///{HOME_DIR}/airflow/airflow.db", "default")
if display_source
else f"sqlite:///{HOME_DIR}/airflow/airflow.db"
)
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == f"sqlite:///{HOME_DIR}/airflow/airflow.db"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_cmd_disabled_config(self, display_source: bool):
with use_config(config="deprecated_cmd.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_cmds=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") is None
assert cfg_dict["database"].get("sql_alchemy_conn") == (
(f"sqlite:///{HOME_DIR}/airflow/airflow.db", "default")
if display_source
else f"sqlite:///{HOME_DIR}/airflow/airflow.db"
)
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == f"sqlite:///{HOME_DIR}/airflow/airflow.db"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": "secret_path'"}, clear=True)
@mock.patch("airflow.configuration.get_custom_secret_backend")
def test_conf_as_dict_when_deprecated_value_in_secrets(
self, get_custom_secret_backend, display_source: bool
):
get_custom_secret_backend.return_value.get_config.return_value = "postgresql://"
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_secret=True,
)
assert cfg_dict["core"].get("sql_alchemy_conn") == (
("postgresql://", "secret") if display_source else "postgresql://"
)
# database should be None because the deprecated value is set in env value
assert cfg_dict["database"].get("sql_alchemy_conn") is None
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == "postgresql://"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch.dict("os.environ", {"AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET": "secret_path'"}, clear=True)
@mock.patch("airflow.configuration.get_custom_secret_backend")
def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_env(
self, get_custom_secret_backend, display_source: bool
):
get_custom_secret_backend.return_value.get_config.return_value = "postgresql://"
with use_config(config="empty.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_secret=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") is None
assert cfg_dict["database"].get("sql_alchemy_conn") == (
(f"sqlite:///{HOME_DIR}/airflow/airflow.db", "default")
if display_source
else f"sqlite:///{HOME_DIR}/airflow/airflow.db"
)
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == f"sqlite:///{HOME_DIR}/airflow/airflow.db"
@pytest.mark.parametrize("display_source", [True, False])
@mock.patch("airflow.configuration.get_custom_secret_backend")
@mock.patch.dict("os.environ", {}, clear=True)
def test_conf_as_dict_when_deprecated_value_in_secrets_disabled_config(
self, get_custom_secret_backend, display_source: bool
):
get_custom_secret_backend.return_value.get_config.return_value = "postgresql://"
with use_config(config="deprecated_secret.cfg"):
cfg_dict = conf.as_dict(
display_source=display_source,
raw=True,
display_sensitive=True,
include_env=True,
include_secret=False,
)
assert cfg_dict["core"].get("sql_alchemy_conn") is None
assert cfg_dict["database"].get("sql_alchemy_conn") == (
(f"sqlite:///{HOME_DIR}/airflow/airflow.db", "default")
if display_source
else f"sqlite:///{HOME_DIR}/airflow/airflow.db"
)
if not display_source:
remove_all_configurations()
conf.read_dict(dictionary=cfg_dict)
os.environ.clear()
assert conf.get("database", "sql_alchemy_conn") == f"sqlite:///{HOME_DIR}/airflow/airflow.db"
def test_as_dict_should_not_falsely_emit_future_warning(self):
from airflow.configuration import AirflowConfigParser
test_conf = AirflowConfigParser()
test_conf.read_dict({"scheduler": {"deactivate_stale_dags_interval": 60}})
with warnings.catch_warnings(record=True) as captured:
test_conf.as_dict()
for w in captured: # only one expected
assert "deactivate_stale_dags_interval option in [scheduler] has been renamed" in str(w.message)
def test_suppress_future_warnings_no_future_warning(self):
from airflow.configuration import AirflowConfigParser
test_conf = AirflowConfigParser()
test_conf.read_dict({"scheduler": {"deactivate_stale_dags_interval": 60}})
with warnings.catch_warnings(record=True) as captured:
test_conf.items("scheduler")
assert len(captured) == 1
c = captured[0]
assert c.category == FutureWarning
assert (
"you should use[scheduler/parsing_cleanup_interval] "
"instead. Please update your `conf.get*`" in str(c.message)
)
with warnings.catch_warnings(record=True) as captured:
with test_conf.suppress_future_warnings():
test_conf.items("scheduler")
assert len(captured) == 1
c = captured[0]
assert c.category == DeprecationWarning
assert (
"deactivate_stale_dags_interval option in [scheduler] "
"has been renamed to parsing_cleanup_interval" in str(c.message)
)
@pytest.mark.parametrize(
"key",
[
param("deactivate_stale_dags_interval", id="old"),
param("parsing_cleanup_interval", id="new"),
],
)
def test_future_warning_only_for_code_ref(self, key):
from airflow.configuration import AirflowConfigParser
old_val = "deactivate_stale_dags_interval"
test_conf = AirflowConfigParser()
test_conf.read_dict({"scheduler": {old_val: 60}}) # config has old value
with warnings.catch_warnings(record=True) as captured:
test_conf.get("scheduler", str(key)) # could be old or new value
w = captured.pop()
assert "the old setting has been used, but please update" in str(w.message)
assert w.category == DeprecationWarning
# only if we use old value, do we also get a warning about code update
if key == old_val:
w = captured.pop()
assert "your `conf.get*` call to use the new name" in str(w.message)
assert w.category == FutureWarning
def test_sensitive_values():
from airflow.settings import conf
# this list was hardcoded prior to 2.6.2
# included here to avoid regression in refactor
# inclusion of keys ending in "password" or "kwargs" is automated from 2.6.2
# items not matching this pattern must be added here manually
sensitive_values = {
("database", "sql_alchemy_conn"),
("core", "fernet_key"),
("celery", "broker_url"),
("celery", "flower_basic_auth"),
("celery", "result_backend"),
("atlas", "password"),
("smtp", "smtp_password"),
("webserver", "secret_key"),
("secrets", "backend_kwargs"),
("sentry", "sentry_dsn"),
("database", "sql_alchemy_engine_args"),
("core", "sql_alchemy_conn"),
}
default_config = default_config_yaml()
all_keys = {(s, k) for s, v in default_config.items() for k in v.get("options")}
suspected_sensitive = {(s, k) for (s, k) in all_keys if k.endswith(("password", "kwargs"))}
exclude_list = {
("kubernetes_executor", "delete_option_kwargs"),
}
suspected_sensitive -= exclude_list
sensitive_values.update(suspected_sensitive)
assert sensitive_values == conf.sensitive_config_values