blob: ea3be518aee5798ef509c93579e8ef8608379f77 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
import functools
import json
import logging
import multiprocessing
import os
import pathlib
import re
import shlex
import stat
import subprocess
import sys
import warnings
from base64 import b64encode
from collections import OrderedDict
# Ignored Mypy on configparser because it thinks the configparser module has no _UNSET attribute
from configparser import _UNSET, ConfigParser, NoOptionError, NoSectionError # type: ignore
from contextlib import contextmanager, suppress
from json.decoder import JSONDecodeError
from re import Pattern
from typing import IO, Any, Dict, Iterable, Set, Tuple, Union
from urllib.parse import urlsplit
from typing_extensions import overload
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
from airflow.utils import yaml
from airflow.utils.module_loading import import_string
from airflow.utils.weight_rule import WeightRule
log = logging.getLogger(__name__)
# show Airflow's deprecation warnings
if not sys.warnoptions:
warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow")
warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow")
_SQLITE3_VERSION_PATTERN = re.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$")
ConfigType = Union[str, int, float, bool]
ConfigOptionsDictType = Dict[str, ConfigType]
ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]]
ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
ENV_VAR_PREFIX = "AIRFLOW__"
def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
if match is None:
return ()
return tuple(int(p) for p in match.group("version").split("."))
@overload
def expand_env_var(env_var: None) -> None:
...
@overload
def expand_env_var(env_var: str) -> str:
...
def expand_env_var(env_var: str | None) -> str | None:
"""
Expands (potentially nested) env vars.
Repeat and apply `expandvars` and `expanduser` until
interpolation stops having any effect.
"""
if not env_var:
return env_var
while True:
interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
if interpolated == env_var:
return interpolated
else:
env_var = interpolated
def run_command(command: str) -> str:
"""Runs command and returns stdout."""
process = subprocess.Popen(
shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True
)
output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate())
if process.returncode != 0:
raise AirflowConfigException(
f"Cannot execute {command}. Error code is: {process.returncode}. "
f"Output: {output}, Stderr: {stderr}"
)
return output
def _get_config_value_from_secret_backend(config_key: str) -> str | None:
"""Get Config option values from Secret Backend."""
try:
secrets_client = get_custom_secret_backend()
if not secrets_client:
return None
return secrets_client.get_config(config_key)
except Exception as e:
raise AirflowConfigException(
"Cannot retrieve config from alternative secrets backend. "
"Make sure it is configured properly and that the Backend "
"is accessible.\n"
f"{e}"
)
def _default_config_file_path(file_name: str) -> str:
templates_dir = os.path.join(os.path.dirname(__file__), "config_templates")
return os.path.join(templates_dir, file_name)
def default_config_yaml() -> dict[str, Any]:
"""
Read Airflow configs from YAML file.
:return: Python dictionary containing configs & their info
"""
with open(_default_config_file_path("config.yml")) as config_file:
return yaml.safe_load(config_file)
class AirflowConfigParser(ConfigParser):
"""Custom Airflow Configparser supporting defaults and deprecated options."""
# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
# These configs can also be fetched from Secrets backend
# following the "{section}__{name}__secret" pattern
@functools.cached_property
def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
default_config = default_config_yaml()
flattened = {
(s, k): item for s, s_c in default_config.items() for k, item in s_c.get("options").items()
}
sensitive = {(section, key) for (section, key), v in flattened.items() if v.get("sensitive") is True}
depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options}
depr_section = {
(self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections
}
sensitive.update(depr_section, depr_option)
return sensitive
# A mapping of (new section, new option) -> (old section, old option, since_version).
# When reading new option, the old option will be checked to see if it exists. If it does a
# DeprecationWarning will be issued and the old option will be used instead
deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"),
("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"),
("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"),
("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"),
("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"),
("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"),
("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"),
("logging", "logging_level"): ("core", "logging_level", "2.0.0"),
("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"),
("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"),
("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"),
("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"),
("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"),
("logging", "log_format"): ("core", "log_format", "2.0.0"),
("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"),
("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"),
("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"),
("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"),
("logging", "dag_processor_manager_log_location"): (
"core",
"dag_processor_manager_log_location",
"2.0.0",
),
("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"),
("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"),
("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"),
("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"),
("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"),
("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"),
("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"),
("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"),
("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"),
("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"),
("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"),
("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"),
("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"),
("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"),
("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"),
("operators", "default_queue"): ("celery", "default_queue", "2.1.0"),
("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"),
("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"),
("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"),
("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"),
("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"),
("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"),
("api", "auth_backends"): ("api", "auth_backend", "2.3.0"),
("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"),
("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"),
("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"),
("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"),
("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"),
("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"),
("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"),
("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"),
("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"),
("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"),
("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
("scheduler", "task_queued_timeout_check_interval"): (
"kubernetes_executor",
"worker_pods_pending_timeout_check_interval",
"2.6.0",
),
}
# A mapping of new configurations to a list of old configurations for when one configuration
# deprecates more than one other deprecation. The deprecation logic for these configurations
# is defined in SchedulerJobRunner.
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
("scheduler", "task_queued_timeout"): [
("celery", "stalled_task_timeout", "2.6.0"),
("celery", "task_adoption_timeout", "2.6.0"),
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
]
}
# A mapping of new section -> (old section, since_version).
deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}
# Now build the inverse so we can go from old_section/old_key to new_section/new_key
# if someone tries to retrieve it based on old_section/old_key
@functools.cached_property
def inversed_deprecated_options(self):
return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()}
@functools.cached_property
def inversed_deprecated_sections(self):
return {
old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items()
}
# A mapping of old default values that we want to change and warn the user
# about. Mapping of section -> setting -> { old, replace, by_version }
deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = {
"core": {
"hostname_callable": (re.compile(r":"), r".", "2.1"),
},
"webserver": {
"navbar_color": (re.compile(r"\A#007A87\Z", re.IGNORECASE), "#fff", "2.1"),
"dag_default_view": (re.compile(r"^tree$"), "grid", "3.0"),
},
"email": {
"email_backend": (
re.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"),
r"airflow.providers.sendgrid.utils.emailer.send_email",
"2.1",
),
},
"logging": {
"log_filename_template": (
re.compile(re.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")),
"XX-set-after-default-config-loaded-XX",
"3.0",
),
},
"api": {
"auth_backends": (
re.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"),
"airflow.api.auth.backend.session",
"3.0",
),
},
"elasticsearch": {
"log_id_template": (
re.compile("^" + re.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"),
"{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
"3.0",
)
},
}
_available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
enums_options = {
("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()),
("core", "dag_ignore_file_syntax"): ["regexp", "glob"],
("core", "mp_start_method"): multiprocessing.get_all_start_methods(),
("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"],
("logging", "logging_level"): _available_logging_levels,
("logging", "fab_logging_level"): _available_logging_levels,
# celery_logging_level can be empty, which uses logging_level as fallback
("logging", "celery_logging_level"): _available_logging_levels + [""],
("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""],
}
upgraded_values: dict[tuple[str, str], str]
"""Mapping of (section,option) to the old value that was upgraded"""
# This method transforms option names on every read, get, or set operation.
# This changes from the default behaviour of ConfigParser from lower-casing
# to instead be case-preserving
def optionxform(self, optionstr: str) -> str:
return optionstr
def __init__(self, default_config: str | None = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.upgraded_values = {}
self.airflow_defaults = ConfigParser(*args, **kwargs)
if default_config is not None:
self.airflow_defaults.read_string(default_config)
# Set the upgrade value based on the current loaded default
default = self.airflow_defaults.get("logging", "log_filename_template", fallback=None)
if default:
replacement = self.deprecated_values["logging"]["log_filename_template"]
self.deprecated_values["logging"]["log_filename_template"] = (
replacement[0],
default,
replacement[2],
)
else:
# In case of tests it might not exist
with suppress(KeyError):
del self.deprecated_values["logging"]["log_filename_template"]
else:
with suppress(KeyError):
del self.deprecated_values["logging"]["log_filename_template"]
self.is_validated = False
self._suppress_future_warnings = False
def validate(self):
self._validate_sqlite3_version()
self._validate_enums()
for section, replacement in self.deprecated_values.items():
for name, info in replacement.items():
old, new, version = info
current_value = self.get(section, name, fallback="")
if self._using_old_value(old, current_value):
self.upgraded_values[(section, name)] = current_value
new_value = old.sub(new, current_value)
self._update_env_var(section=section, name=name, new_value=new_value)
self._create_future_warning(
name=name,
section=section,
current_value=current_value,
new_value=new_value,
version=version,
)
self._upgrade_auth_backends()
self._upgrade_postgres_metastore_conn()
self.is_validated = True
def _upgrade_auth_backends(self):
"""
Ensure a custom auth_backends setting contains session.
This is required by the UI for ajax queries.
"""
old_value = self.get("api", "auth_backends", fallback="")
if old_value in ("airflow.api.auth.backend.default", ""):
# handled by deprecated_values
pass
elif old_value.find("airflow.api.auth.backend.session") == -1:
new_value = old_value + ",airflow.api.auth.backend.session"
self._update_env_var(section="api", name="auth_backends", new_value=new_value)
self.upgraded_values[("api", "auth_backends")] = old_value
# if the old value is set via env var, we need to wipe it
# otherwise, it'll "win" over our adjusted value
old_env_var = self._env_var_name("api", "auth_backend")
os.environ.pop(old_env_var, None)
warnings.warn(
"The auth_backends setting in [api] has had airflow.api.auth.backend.session added "
"in the running config, which is needed by the UI. Please update your config before "
"Apache Airflow 3.0.",
FutureWarning,
)
def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
must be replaced with `postgresql`.
"""
section, key = "database", "sql_alchemy_conn"
old_value = self.get(section, key, _extra_stacklevel=1)
bad_schemes = ["postgres+psycopg2", "postgres"]
good_scheme = "postgresql"
parsed = urlsplit(old_value)
if parsed.scheme in bad_schemes:
warnings.warn(
f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. "
"As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must "
f"change to `{good_scheme}` before the next Airflow release.",
FutureWarning,
)
self.upgraded_values[(section, key)] = old_value
new_value = re.sub("^" + re.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value)
self._update_env_var(section=section, name=key, new_value=new_value)
# if the old value is set via env var, we need to wipe it
# otherwise, it'll "win" over our adjusted value
old_env_var = self._env_var_name("core", key)
os.environ.pop(old_env_var, None)
def _validate_enums(self):
"""Validate that enum type config has an accepted value."""
for (section_key, option_key), enum_options in self.enums_options.items():
if self.has_option(section_key, option_key):
value = self.get(section_key, option_key)
if value not in enum_options:
raise AirflowConfigException(
f"`[{section_key}] {option_key}` should not be "
f"{value!r}. Possible values: {', '.join(enum_options)}."
)
def _validate_sqlite3_version(self):
"""Validate SQLite version.
Some features in storing rendered fields require SQLite >= 3.15.0.
"""
if "sqlite" not in self.get("database", "sql_alchemy_conn"):
return
import sqlite3
min_sqlite_version = (3, 15, 0)
if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version:
return
from airflow.utils.docs import get_docs_url
min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version)
raise AirflowConfigException(
f"error: SQLite C library too old (< {min_sqlite_version_str}). "
f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
)
def _using_old_value(self, old: Pattern, current_value: str) -> bool:
return old.search(current_value) is not None
def _update_env_var(self, section: str, name: str, new_value: str):
env_var = self._env_var_name(section, name)
# Set it as an env var so that any subprocesses keep the same override!
os.environ[env_var] = new_value
@staticmethod
def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str):
warnings.warn(
f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. "
f"This value has been changed to {new_value!r} in the running config, but "
f"please update your config before Apache Airflow {version}.",
FutureWarning,
)
def _env_var_name(self, section: str, key: str) -> str:
return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}"
def _get_env_var_option(self, section: str, key: str):
# must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
env_var = self._env_var_name(section, key)
if env_var in os.environ:
return expand_env_var(os.environ[env_var])
# alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
env_var_cmd = env_var + "_CMD"
if env_var_cmd in os.environ:
# if this is a valid command key...
if (section, key) in self.sensitive_config_values:
return run_command(os.environ[env_var_cmd])
# alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
env_var_secret_path = env_var + "_SECRET"
if env_var_secret_path in os.environ:
# if this is a valid secret path...
if (section, key) in self.sensitive_config_values:
return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
return None
def _get_cmd_option(self, section: str, key: str):
fallback_key = key + "_cmd"
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
command = super().get(section, fallback_key)
return run_command(command)
return None
def _get_cmd_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> str | None:
fallback_key = key + "_cmd"
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
command_value = section_dict.get(fallback_key)
if command_value is not None:
if isinstance(command_value, str):
command = command_value
else:
command = command_value[0]
return run_command(command)
return None
def _get_secret_option(self, section: str, key: str) -> str | None:
"""Get Config option values from Secret Backend."""
fallback_key = key + "_secret"
if (section, key) in self.sensitive_config_values:
if super().has_option(section, fallback_key):
secrets_path = super().get(section, fallback_key)
return _get_config_value_from_secret_backend(secrets_path)
return None
def _get_secret_option_from_config_sources(
self, config_sources: ConfigSourcesType, section: str, key: str
) -> str | None:
fallback_key = key + "_secret"
if (section, key) in self.sensitive_config_values:
section_dict = config_sources.get(section)
if section_dict is not None:
secrets_path_value = section_dict.get(fallback_key)
if secrets_path_value is not None:
if isinstance(secrets_path_value, str):
secrets_path = secrets_path_value
else:
secrets_path = secrets_path_value[0]
return _get_config_value_from_secret_backend(secrets_path)
return None
def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
value = self.get(section, key, _extra_stacklevel=1, **kwargs)
if value is None:
raise ValueError(f"The value {section}/{key} should be set!")
return value
@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]
...
@overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override]
...
def get( # type: ignore[override, misc]
self,
section: str,
key: str,
_extra_stacklevel: int = 0,
**kwargs,
) -> str | None:
section = str(section).lower()
key = str(key).lower()
warning_emitted = False
deprecated_section: str | None
deprecated_key: str | None
# For when we rename whole sections
if section in self.inversed_deprecated_sections:
deprecated_section, deprecated_key = (section, key)
section = self.inversed_deprecated_sections[section]
if not self._suppress_future_warnings:
warnings.warn(
f"The config section [{deprecated_section}] has been renamed to "
f"[{section}]. Please update your `conf.get*` call to use the new name",
FutureWarning,
stacklevel=2 + _extra_stacklevel,
)
# Don't warn about individual rename if the whole section is renamed
warning_emitted = True
elif (section, key) in self.inversed_deprecated_options:
# Handle using deprecated section/key instead of the new section/key
new_section, new_key = self.inversed_deprecated_options[(section, key)]
if not self._suppress_future_warnings and not warning_emitted:
warnings.warn(
f"section/key [{section}/{key}] has been deprecated, you should use"
f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the "
"new name",
FutureWarning,
stacklevel=2 + _extra_stacklevel,
)
warning_emitted = True
deprecated_section, deprecated_key = section, key
section, key = (new_section, new_key)
elif section in self.deprecated_sections:
# When accessing the new section name, make sure we check under the old config name
deprecated_key = key
deprecated_section = self.deprecated_sections[section][0]
else:
deprecated_section, deprecated_key, _ = self.deprecated_options.get(
(section, key), (None, None, None)
)
# first check environment variables
option = self._get_environment_variables(
deprecated_key,
deprecated_section,
key,
section,
issue_warning=not warning_emitted,
extra_stacklevel=_extra_stacklevel,
)
if option is not None:
return option
# ...then the config file
option = self._get_option_from_config_file(
deprecated_key,
deprecated_section,
key,
kwargs,
section,
issue_warning=not warning_emitted,
extra_stacklevel=_extra_stacklevel,
)
if option is not None:
return option
# ...then commands
option = self._get_option_from_commands(
deprecated_key,
deprecated_section,
key,
section,
issue_warning=not warning_emitted,
extra_stacklevel=_extra_stacklevel,
)
if option is not None:
return option
# ...then from secret backends
option = self._get_option_from_secrets(
deprecated_key,
deprecated_section,
key,
section,
issue_warning=not warning_emitted,
extra_stacklevel=_extra_stacklevel,
)
if option is not None:
return option
# ...then the default config
if self.airflow_defaults.has_option(section, key) or "fallback" in kwargs:
return expand_env_var(self.airflow_defaults.get(section, key, **kwargs))
log.warning("section/key [%s/%s] not found in config", section, key)
raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
def _get_option_from_secrets(
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
extra_stacklevel: int = 0,
) -> str | None:
option = self._get_secret_option(section, key)
if option:
return option
if deprecated_section and deprecated_key:
with self.suppress_future_warnings():
option = self._get_secret_option(deprecated_section, deprecated_key)
if option:
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
return option
return None
def _get_option_from_commands(
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
extra_stacklevel: int = 0,
) -> str | None:
option = self._get_cmd_option(section, key)
if option:
return option
if deprecated_section and deprecated_key:
with self.suppress_future_warnings():
option = self._get_cmd_option(deprecated_section, deprecated_key)
if option:
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
return option
return None
def _get_option_from_config_file(
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
kwargs: dict[str, Any],
section: str,
issue_warning: bool = True,
extra_stacklevel: int = 0,
) -> str | None:
if super().has_option(section, key):
# Use the parent's methods to get the actual config here to be able to
# separate the config from default config.
return expand_env_var(super().get(section, key, **kwargs))
if deprecated_section and deprecated_key:
if super().has_option(deprecated_section, deprecated_key):
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
with self.suppress_future_warnings():
return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs))
return None
def _get_environment_variables(
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
extra_stacklevel: int = 0,
) -> str | None:
option = self._get_env_var_option(section, key)
if option is not None:
return option
if deprecated_section and deprecated_key:
with self.suppress_future_warnings():
option = self._get_env_var_option(deprecated_section, deprecated_key)
if option is not None:
if issue_warning:
self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel)
return option
return None
def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override]
val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip()
if "#" in val:
val = val.split("#")[0].strip()
if val in ("t", "true", "1"):
return True
elif val in ("f", "false", "0"):
return False
else:
raise AirflowConfigException(
f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)
def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override]
val = self.get(section, key, _extra_stacklevel=1, **kwargs)
if val is None:
raise AirflowConfigException(
f"Failed to convert value None to int. "
f'Please check "{key}" key in "{section}" section is set.'
)
try:
return int(val)
except ValueError:
raise AirflowConfigException(
f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)
def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override]
val = self.get(section, key, _extra_stacklevel=1, **kwargs)
if val is None:
raise AirflowConfigException(
f"Failed to convert value None to float. "
f'Please check "{key}" key in "{section}" section is set.'
)
try:
return float(val)
except ValueError:
raise AirflowConfigException(
f'Failed to convert value to float. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)
def getimport(self, section: str, key: str, **kwargs) -> Any:
"""
Reads options, imports the full qualified name, and returns the object.
In case of failure, it throws an exception with the key and section names
:return: The object or None, if the option is empty
"""
full_qualified_path = conf.get(section=section, key=key, **kwargs)
if not full_qualified_path:
return None
try:
return import_string(full_qualified_path)
except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The object could not be loaded. Please check "{key}" key in "{section}" section. '
f'Current value: "{full_qualified_path}".'
)
def getjson(
self, section: str, key: str, fallback=_UNSET, **kwargs
) -> dict | list | str | int | float | None:
"""
Return a config value parsed from a JSON string.
``fallback`` is *not* JSON parsed but used verbatim when no config value is given.
"""
# get always returns the fallback value as a string, so for this if
# someone gives us an object we want to keep that
default = _UNSET
if fallback is not _UNSET:
default = fallback
fallback = _UNSET
try:
data = self.get(section=section, key=key, fallback=fallback, _extra_stacklevel=1, **kwargs)
except (NoSectionError, NoOptionError):
return default
if not data:
return default if default is not _UNSET else None
try:
return json.loads(data)
except JSONDecodeError as e:
raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e
def gettimedelta(
self, section: str, key: str, fallback: Any = None, **kwargs
) -> datetime.timedelta | None:
"""
Gets the config value for the given section and key, and converts it into datetime.timedelta object.
If the key is missing, then it is considered as `None`.
:param section: the section from the config
:param key: the key defined in the given section
:param fallback: fallback value when no config value is given, defaults to None
:raises AirflowConfigException: raised because ValueError or OverflowError
:return: datetime.timedelta(seconds=<config_value>) or None
"""
val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs)
if val:
# the given value must be convertible to integer
try:
int_val = int(val)
except ValueError:
raise AirflowConfigException(
f'Failed to convert value to int. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)
try:
return datetime.timedelta(seconds=int_val)
except OverflowError as err:
raise AirflowConfigException(
f"Failed to convert value to timedelta in `seconds`. "
f"{err}. "
f'Please check "{key}" key in "{section}" section. Current value: "{val}".'
)
return fallback
def read(
self,
filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]),
encoding=None,
):
super().read(filenames=filenames, encoding=encoding)
# The RawConfigParser defines "Mapping" from abc.collections is not subscriptable - so we have
# to use Dict here.
def read_dict( # type: ignore[override]
self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>"
):
super().read_dict(dictionary=dictionary, source=source)
def has_option(self, section: str, option: str) -> bool:
try:
# Using self.get() to avoid reimplementing the priority order
# of config variables (env, config, cmd, defaults)
# UNSET to avoid logging a warning about missing values
self.get(section, option, fallback=_UNSET, _extra_stacklevel=1)
return True
except (NoOptionError, NoSectionError):
return False
def remove_option(self, section: str, option: str, remove_default: bool = True):
"""
Remove an option if it exists in config from a file or default config.
If both of config have the same option, this removes the option
in both configs unless remove_default=False.
"""
if super().has_option(section, option):
super().remove_option(section, option)
if self.airflow_defaults.has_option(section, option) and remove_default:
self.airflow_defaults.remove_option(section, option)
def getsection(self, section: str) -> ConfigOptionsDictType | None:
"""
Returns the section as a dict.
Values are converted to int, float, bool as required.
:param section: section from the config
"""
if not self.has_section(section) and not self.airflow_defaults.has_section(section):
return None
if self.airflow_defaults.has_section(section):
_section: ConfigOptionsDictType = OrderedDict(self.airflow_defaults.items(section))
else:
_section = OrderedDict()
if self.has_section(section):
_section.update(OrderedDict(self.items(section)))
section_prefix = self._env_var_name(section, "")
for env_var in sorted(os.environ.keys()):
if env_var.startswith(section_prefix):
key = env_var.replace(section_prefix, "")
if key.endswith("_CMD"):
key = key[:-4]
key = key.lower()
_section[key] = self._get_env_var_option(section, key)
for key, val in _section.items():
if val is None:
raise AirflowConfigException(
f"Failed to convert value automatically. "
f'Please check "{key}" key in "{section}" section is set.'
)
try:
_section[key] = int(val)
except ValueError:
try:
_section[key] = float(val)
except ValueError:
if isinstance(val, str) and val.lower() in ("t", "true"):
_section[key] = True
elif isinstance(val, str) and val.lower() in ("f", "false"):
_section[key] = False
return _section
def write( # type: ignore[override]
self, fp: IO, space_around_delimiters: bool = True, section: str | None = None
) -> None:
# This is based on the configparser.RawConfigParser.write method code to add support for
# reading options from environment variables.
# Various type ignores below deal with less-than-perfect RawConfigParser superclass typing
if space_around_delimiters:
delimiter = f" {self._delimiters[0]} " # type: ignore[attr-defined]
else:
delimiter = self._delimiters[0] # type: ignore[attr-defined]
if self._defaults: # type: ignore
self._write_section( # type: ignore[attr-defined]
fp, self.default_section, self._defaults.items(), delimiter # type: ignore[attr-defined]
)
sections = (
{section: dict(self.getsection(section))} # type: ignore[arg-type]
if section
else self._sections # type: ignore[attr-defined]
)
for sect in sections:
item_section: ConfigOptionsDictType = self.getsection(sect) # type: ignore[assignment]
self._write_section(fp, sect, item_section.items(), delimiter) # type: ignore[attr-defined]
def as_dict(
self,
display_source: bool = False,
display_sensitive: bool = False,
raw: bool = False,
include_env: bool = True,
include_cmds: bool = True,
include_secret: bool = True,
) -> ConfigSourcesType:
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
When materializing current configuration Airflow defaults are
materialized along with user set configs. If any of the `include_*`
options are False then the result of calling command or secret key
configs do not override Airflow defaults and instead are passed through.
In order to then avoid Airflow defaults from overwriting user set
command or secret key configs we filter out bare sensitive_config_values
that are set to Airflow defaults when command or secret key configs
produce different values.
:param display_source: If False, the option value is returned. If True,
a tuple of (option_value, source) is returned. Source is either
'airflow.cfg', 'default', 'env var', or 'cmd'.
:param display_sensitive: If True, the values of options set by env
vars and bash commands will be displayed. If False, those options
are shown as '< hidden >'
:param raw: Should the values be output as interpolated values, or the
"raw" form that can be fed back in to ConfigParser
:param include_env: Should the value of configuration from AIRFLOW__
environment variables be included or not
:param include_cmds: Should the result of calling any *_cmd config be
set (True, default), or should the _cmd options be left as the
command to run (False)
:param include_secret: Should the result of calling any *_secret config be
set (True, default), or should the _secret options be left as the
path to get the secret from (False)
:return: Dictionary, where the key is the name of the section and the content is
the dictionary with the name of the parameter and its value.
"""
if not display_sensitive:
# We want to hide the sensitive values at the appropriate methods
# since envs from cmds, secrets can be read at _include_envs method
if not all([include_env, include_cmds, include_secret]):
raise ValueError(
"If display_sensitive is false, then include_env, "
"include_cmds, include_secret must all be set as True"
)
config_sources: ConfigSourcesType = {}
configs = [
("default", self.airflow_defaults),
("airflow.cfg", self),
]
self._replace_config_with_display_sources(
config_sources,
configs,
display_source,
raw,
self.deprecated_options,
include_cmds=include_cmds,
include_env=include_env,
include_secret=include_secret,
)
# add env vars and overwrite because they have priority
if include_env:
self._include_envs(config_sources, display_sensitive, display_source, raw)
else:
self._filter_by_source(config_sources, display_source, self._get_env_var_option)
# add bash commands
if include_cmds:
self._include_commands(config_sources, display_sensitive, display_source, raw)
else:
self._filter_by_source(config_sources, display_source, self._get_cmd_option)
# add config from secret backends
if include_secret:
self._include_secrets(config_sources, display_sensitive, display_source, raw)
else:
self._filter_by_source(config_sources, display_source, self._get_secret_option)
if not display_sensitive:
# This ensures the ones from config file is hidden too
# if they are not provided through env, cmd and secret
hidden = "< hidden >"
for section, key in self.sensitive_config_values:
if not config_sources.get(section):
continue
if config_sources[section].get(key, None):
if display_source:
source = config_sources[section][key][1]
config_sources[section][key] = (hidden, source)
else:
config_sources[section][key] = hidden
return config_sources
def _include_secrets(
self,
config_sources: ConfigSourcesType,
display_sensitive: bool,
display_source: bool,
raw: bool,
):
for section, key in self.sensitive_config_values:
value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
if value:
if not display_sensitive:
value = "< hidden >"
if display_source:
opt: str | tuple[str, str] = (value, "secret")
elif raw:
opt = value.replace("%", "%%")
else:
opt = value
config_sources.setdefault(section, OrderedDict()).update({key: opt})
del config_sources[section][key + "_secret"]
def _include_commands(
self,
config_sources: ConfigSourcesType,
display_sensitive: bool,
display_source: bool,
raw: bool,
):
for section, key in self.sensitive_config_values:
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
if not opt:
continue
opt_to_set: str | tuple[str, str] | None = opt
if not display_sensitive:
opt_to_set = "< hidden >"
if display_source:
opt_to_set = (str(opt_to_set), "cmd")
elif raw:
opt_to_set = str(opt_to_set).replace("%", "%%")
if opt_to_set is not None:
dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set}
config_sources.setdefault(section, OrderedDict()).update(dict_to_update)
del config_sources[section][key + "_cmd"]
def _include_envs(
self,
config_sources: ConfigSourcesType,
display_sensitive: bool,
display_source: bool,
raw: bool,
):
for env_var in [
os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX)
]:
try:
_, section, key = env_var.split("__", 2)
opt = self._get_env_var_option(section, key)
except ValueError:
continue
if opt is None:
log.warning("Ignoring unknown env var '%s'", env_var)
continue
if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"):
# Don't hide cmd/secret values here
if not env_var.lower().endswith("cmd") and not env_var.lower().endswith("secret"):
if (section, key) in self.sensitive_config_values:
opt = "< hidden >"
elif raw:
opt = opt.replace("%", "%%")
if display_source:
opt = (opt, "env var")
section = section.lower()
# if we lower key for kubernetes_environment_variables section,
# then we won't be able to set any Airflow environment
# variables. Airflow only parse environment variables starts
# with AIRFLOW_. Therefore, we need to make it a special case.
if section != "kubernetes_environment_variables":
key = key.lower()
config_sources.setdefault(section, OrderedDict()).update({key: opt})
def _filter_by_source(
self,
config_sources: ConfigSourcesType,
display_source: bool,
getter_func,
):
"""
Deletes default configs from current configuration.
An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values.
This is necessary because bare configs take precedence over the command
or secret key equivalents so if the current running config is
materialized with Airflow defaults they in turn override user set
command or secret key configs.
:param config_sources: The current configuration to operate on
:param display_source: If False, configuration options contain raw
values. If True, options are a tuple of (option_value, source).
Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
:param getter_func: A callback function that gets the user configured
override value for a particular sensitive_config_values config.
:return: None, the given config_sources is filtered if necessary,
otherwise untouched.
"""
for section, key in self.sensitive_config_values:
# Don't bother if we don't have section / key
if section not in config_sources or key not in config_sources[section]:
continue
# Check that there is something to override defaults
try:
getter_opt = getter_func(section, key)
except ValueError:
continue
if not getter_opt:
continue
# Check to see that there is a default value
if not self.airflow_defaults.has_option(section, key):
continue
# Check to see if bare setting is the same as defaults
if display_source:
# when display_source = true, we know that the config_sources contains tuple
opt, source = config_sources[section][key] # type: ignore
else:
opt = config_sources[section][key]
if opt == self.airflow_defaults.get(section, key):
del config_sources[section][key]
@staticmethod
def _replace_config_with_display_sources(
config_sources: ConfigSourcesType,
configs: Iterable[tuple[str, ConfigParser]],
display_source: bool,
raw: bool,
deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
for source_name, config in configs:
for section in config.sections():
AirflowConfigParser._replace_section_config_with_display_sources(
config,
config_sources,
display_source,
raw,
section,
source_name,
deprecated_options,
configs,
include_env=include_env,
include_cmds=include_cmds,
include_secret=include_secret,
)
@staticmethod
def _deprecated_value_is_set_in_config(
deprecated_section: str,
deprecated_key: str,
configs: Iterable[tuple[str, ConfigParser]],
) -> bool:
for config_type, config in configs:
if config_type == "default":
continue
try:
deprecated_section_array = config.items(section=deprecated_section, raw=True)
for key_candidate, _ in deprecated_section_array:
if key_candidate == deprecated_key:
return True
except NoSectionError:
pass
return False
@staticmethod
def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}")
is not None
)
@staticmethod
def _deprecated_command_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs
)
@staticmethod
def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD")
is not None
)
@staticmethod
def _deprecated_secret_is_set_in_config(
deprecated_section: str, deprecated_key: str, configs: Iterable[tuple[str, ConfigParser]]
) -> bool:
return AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs
)
@staticmethod
def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool:
return (
os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET")
is not None
)
@contextmanager
def suppress_future_warnings(self):
suppress_future_warnings = self._suppress_future_warnings
self._suppress_future_warnings = True
yield self
self._suppress_future_warnings = suppress_future_warnings
@staticmethod
def _replace_section_config_with_display_sources(
config: ConfigParser,
config_sources: ConfigSourcesType,
display_source: bool,
raw: bool,
section: str,
source_name: str,
deprecated_options: dict[tuple[str, str], tuple[str, str, str]],
configs: Iterable[tuple[str, ConfigParser]],
include_env: bool,
include_cmds: bool,
include_secret: bool,
):
sect = config_sources.setdefault(section, OrderedDict())
if isinstance(config, AirflowConfigParser):
with config.suppress_future_warnings():
items = config.items(section=section, raw=raw)
else:
items = config.items(section=section, raw=raw)
for k, val in items:
deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None))
if deprecated_section and deprecated_key:
if source_name == "default":
# If deprecated entry has some non-default value set for any of the sources requested,
# We should NOT set default for the new entry (because it will override anything
# coming from the deprecated ones)
if AirflowConfigParser._deprecated_value_is_set_in_config(
deprecated_section, deprecated_key, configs
):
continue
if include_env and AirflowConfigParser._deprecated_variable_is_set(
deprecated_section, deprecated_key
):
continue
if include_cmds and (
AirflowConfigParser._deprecated_variable_command_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_command_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if include_secret and (
AirflowConfigParser._deprecated_variable_secret_is_set(
deprecated_section, deprecated_key
)
or AirflowConfigParser._deprecated_secret_is_set_in_config(
deprecated_section, deprecated_key, configs
)
):
continue
if display_source:
sect[k] = (val, source_name)
else:
sect[k] = val
def load_test_config(self):
"""
Load the unit test configuration.
Note: this is not reversible.
"""
# remove all sections, falling back to defaults
for section in self.sections():
self.remove_section(section)
# then read test config
path = _default_config_file_path("default_test.cfg")
log.info("Reading default test configuration from %s", path)
self.read_string(_parameterized_config_from_template("default_test.cfg"))
# then read any "custom" test settings
log.info("Reading test configuration from %s", TEST_CONFIG_FILE)
self.read(TEST_CONFIG_FILE)
@staticmethod
def _warn_deprecate(
section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int
):
if section == deprecated_section:
warnings.warn(
f"The {deprecated_name} option in [{section}] has been renamed to {key} - "
f"the old setting has been used, but please update your config.",
DeprecationWarning,
stacklevel=4 + extra_stacklevel,
)
else:
warnings.warn(
f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option "
f"in [{section}] - the old setting has been used, but please update your config.",
DeprecationWarning,
stacklevel=4 + extra_stacklevel,
)
def __getstate__(self):
return {
name: getattr(self, name)
for name in [
"_sections",
"is_validated",
"airflow_defaults",
]
}
def __setstate__(self, state):
self.__init__()
config = state.pop("_sections")
self.read_dict(config)
self.__dict__.update(state)
def get_airflow_home() -> str:
"""Get path to Airflow Home."""
return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow"))
def get_airflow_config(airflow_home) -> str:
"""Get Path to airflow.cfg path."""
airflow_config_var = os.environ.get("AIRFLOW_CONFIG")
if airflow_config_var is None:
return os.path.join(airflow_home, "airflow.cfg")
return expand_env_var(airflow_config_var)
def _parameterized_config_from_template(filename) -> str:
TEMPLATE_START = "# ----------------------- TEMPLATE BEGINS HERE -----------------------\n"
path = _default_config_file_path(filename)
with open(path) as fh:
for line in fh:
if line != TEMPLATE_START:
continue
return parameterized_config(fh.read().strip())
raise RuntimeError(f"Template marker not found in {path!r}")
def parameterized_config(template) -> str:
"""
Generates configuration from provided template & variables defined in current scope.
:param template: a config content templated with {{variables}}
"""
all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
return template.format(**all_vars)
def get_airflow_test_config(airflow_home) -> str:
"""Get path to unittests.cfg."""
if "AIRFLOW_TEST_CONFIG" not in os.environ:
return os.path.join(airflow_home, "unittests.cfg")
# It will never return None
return expand_env_var(os.environ["AIRFLOW_TEST_CONFIG"]) # type: ignore[return-value]
def _generate_fernet_key() -> str:
from cryptography.fernet import Fernet
return Fernet.generate_key().decode()
def initialize_config() -> AirflowConfigParser:
"""
Load the Airflow config files.
Called for you automatically as part of the Airflow boot process.
"""
global FERNET_KEY, AIRFLOW_HOME, WEBSERVER_CONFIG
default_config = _parameterized_config_from_template("default_airflow.cfg")
local_conf = AirflowConfigParser(default_config=default_config)
if local_conf.getboolean("core", "unit_test_mode"):
# Load test config only
if not os.path.isfile(TEST_CONFIG_FILE):
from cryptography.fernet import Fernet
log.info("Creating new Airflow config file for unit tests in: %s", TEST_CONFIG_FILE)
pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
FERNET_KEY = Fernet.generate_key().decode()
with open(TEST_CONFIG_FILE, "w") as file:
cfg = _parameterized_config_from_template("default_test.cfg")
file.write(cfg)
make_group_other_inaccessible(TEST_CONFIG_FILE)
local_conf.load_test_config()
else:
# Load normal config
if not os.path.isfile(AIRFLOW_CONFIG):
from cryptography.fernet import Fernet
log.info("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
pathlib.Path(AIRFLOW_HOME).mkdir(parents=True, exist_ok=True)
FERNET_KEY = Fernet.generate_key().decode()
with open(AIRFLOW_CONFIG, "w") as file:
file.write(default_config)
make_group_other_inaccessible(AIRFLOW_CONFIG)
log.info("Reading the config from %s", AIRFLOW_CONFIG)
local_conf.read(AIRFLOW_CONFIG)
if local_conf.has_option("core", "AIRFLOW_HOME"):
msg = (
"Specifying both AIRFLOW_HOME environment variable and airflow_home "
"in the config file is deprecated. Please use only the AIRFLOW_HOME "
"environment variable and remove the config file entry."
)
if "AIRFLOW_HOME" in os.environ:
warnings.warn(msg, category=DeprecationWarning)
elif local_conf.get("core", "airflow_home") == AIRFLOW_HOME:
warnings.warn(
"Specifying airflow_home in the config file is deprecated. As you "
"have left it at the default value you should remove the setting "
"from your airflow.cfg and suffer no change in behaviour.",
category=DeprecationWarning,
)
else:
# there
AIRFLOW_HOME = local_conf.get("core", "airflow_home") # type: ignore[assignment]
warnings.warn(msg, category=DeprecationWarning)
# They _might_ have set unit_test_mode in the airflow.cfg, we still
# want to respect that and then load the unittests.cfg
if local_conf.getboolean("core", "unit_test_mode"):
local_conf.load_test_config()
WEBSERVER_CONFIG = local_conf.get("webserver", "config_file")
if not os.path.isfile(WEBSERVER_CONFIG):
import shutil
log.info("Creating new FAB webserver config file in: %s", WEBSERVER_CONFIG)
shutil.copy(_default_config_file_path("default_webserver_config.py"), WEBSERVER_CONFIG)
return local_conf
def make_group_other_inaccessible(file_path: str):
try:
permissions = os.stat(file_path)
os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR))
except Exception as e:
log.warning(
"Could not change permissions of config file to be group/other inaccessible. "
"Continuing with original permissions:",
e,
)
# Historical convenience functions to access config entries
def load_test_config():
"""Historical load_test_config."""
warnings.warn(
"Accessing configuration method 'load_test_config' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.load_test_config'",
DeprecationWarning,
stacklevel=2,
)
conf.load_test_config()
def get(*args, **kwargs) -> ConfigType | None:
"""Historical get."""
warnings.warn(
"Accessing configuration method 'get' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.get'",
DeprecationWarning,
stacklevel=2,
)
return conf.get(*args, **kwargs)
def getboolean(*args, **kwargs) -> bool:
"""Historical getboolean."""
warnings.warn(
"Accessing configuration method 'getboolean' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getboolean'",
DeprecationWarning,
stacklevel=2,
)
return conf.getboolean(*args, **kwargs)
def getfloat(*args, **kwargs) -> float:
"""Historical getfloat."""
warnings.warn(
"Accessing configuration method 'getfloat' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getfloat'",
DeprecationWarning,
stacklevel=2,
)
return conf.getfloat(*args, **kwargs)
def getint(*args, **kwargs) -> int:
"""Historical getint."""
warnings.warn(
"Accessing configuration method 'getint' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getint'",
DeprecationWarning,
stacklevel=2,
)
return conf.getint(*args, **kwargs)
def getsection(*args, **kwargs) -> ConfigOptionsDictType | None:
"""Historical getsection."""
warnings.warn(
"Accessing configuration method 'getsection' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.getsection'",
DeprecationWarning,
stacklevel=2,
)
return conf.getsection(*args, **kwargs)
def has_option(*args, **kwargs) -> bool:
"""Historical has_option."""
warnings.warn(
"Accessing configuration method 'has_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.has_option'",
DeprecationWarning,
stacklevel=2,
)
return conf.has_option(*args, **kwargs)
def remove_option(*args, **kwargs) -> bool:
"""Historical remove_option."""
warnings.warn(
"Accessing configuration method 'remove_option' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.remove_option'",
DeprecationWarning,
stacklevel=2,
)
return conf.remove_option(*args, **kwargs)
def as_dict(*args, **kwargs) -> ConfigSourcesType:
"""Historical as_dict."""
warnings.warn(
"Accessing configuration method 'as_dict' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.as_dict'",
DeprecationWarning,
stacklevel=2,
)
return conf.as_dict(*args, **kwargs)
def set(*args, **kwargs) -> None:
"""Historical set."""
warnings.warn(
"Accessing configuration method 'set' directly from the configuration module is "
"deprecated. Please access the configuration from the 'configuration.conf' object via "
"'conf.set'",
DeprecationWarning,
stacklevel=2,
)
conf.set(*args, **kwargs)
def ensure_secrets_loaded() -> list[BaseSecretsBackend]:
"""
Ensure that all secrets backends are loaded.
If the secrets_backend_list contains only 2 default backends, reload it.
"""
# Check if the secrets_backend_list contains only 2 default backends
if len(secrets_backend_list) == 2:
return initialize_secrets_backends()
return secrets_backend_list
def get_custom_secret_backend() -> BaseSecretsBackend | None:
"""Get Secret Backend if defined in airflow.cfg."""
secrets_backend_cls = conf.getimport(section="secrets", key="backend")
if not secrets_backend_cls:
return None
try:
backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs")
if not backend_kwargs:
backend_kwargs = {}
elif not isinstance(backend_kwargs, dict):
raise ValueError("not a dict")
except AirflowConfigException:
log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.")
backend_kwargs = {}
except ValueError:
log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.")
backend_kwargs = {}
return secrets_backend_cls(**backend_kwargs)
def initialize_secrets_backends() -> list[BaseSecretsBackend]:
"""
Initialize secrets backend.
* import secrets backend classes
* instantiate them and return them in a list
"""
backend_list = []
custom_secret_backend = get_custom_secret_backend()
if custom_secret_backend is not None:
backend_list.append(custom_secret_backend)
for class_name in DEFAULT_SECRETS_SEARCH_PATH:
secrets_backend_cls = import_string(class_name)
backend_list.append(secrets_backend_cls())
return backend_list
@functools.lru_cache(maxsize=None)
def _DEFAULT_CONFIG() -> str:
path = _default_config_file_path("default_airflow.cfg")
with open(path) as fh:
return fh.read()
@functools.lru_cache(maxsize=None)
def _TEST_CONFIG() -> str:
path = _default_config_file_path("default_test.cfg")
with open(path) as fh:
return fh.read()
_deprecated = {
"DEFAULT_CONFIG": _DEFAULT_CONFIG,
"TEST_CONFIG": _TEST_CONFIG,
"TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"),
"DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"),
}
def __getattr__(name):
if name in _deprecated:
warnings.warn(
f"{__name__}.{name} is deprecated and will be removed in future",
DeprecationWarning,
stacklevel=2,
)
return _deprecated[name]()
raise AttributeError(f"module {__name__} has no attribute {name}")
# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.
AIRFLOW_HOME = get_airflow_home()
AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)
# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags"
)
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins"
)
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins")
TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME)
SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8")
FERNET_KEY = "" # Set only if needed when generating a new file
WEBSERVER_CONFIG = "" # Set by initialize_config
conf = initialize_config()
secrets_backend_list = initialize_secrets_backends()
conf.validate()