| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import contextlib |
| import datetime |
| import functools |
| import itertools |
| import json |
| import logging |
| import multiprocessing |
| import os |
| import pathlib |
| import shlex |
| import stat |
| import subprocess |
| import sys |
| import warnings |
| from base64 import b64encode |
| from configparser import ConfigParser, NoOptionError, NoSectionError |
| from contextlib import contextmanager |
| from copy import deepcopy |
| from io import StringIO |
| from json.decoder import JSONDecodeError |
| from typing import IO, TYPE_CHECKING, Any, Dict, Generator, Iterable, Pattern, Set, Tuple, Union |
| from urllib.parse import urlsplit |
| |
| import re2 |
| from packaging.version import parse as parse_version |
| from typing_extensions import overload |
| |
| from airflow.exceptions import AirflowConfigException |
| from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH |
| from airflow.utils import yaml |
| from airflow.utils.empty_set import _get_empty_set_for_configuration |
| from airflow.utils.module_loading import import_string |
| from airflow.utils.providers_configuration_loader import providers_configuration_loaded |
| from airflow.utils.weight_rule import WeightRule |
| |
| if TYPE_CHECKING: |
| from airflow.auth.managers.base_auth_manager import BaseAuthManager |
| from airflow.secrets import BaseSecretsBackend |
| |
| log = logging.getLogger(__name__) |
| |
| # show Airflow's deprecation warnings |
| if not sys.warnoptions: |
| warnings.filterwarnings(action="default", category=DeprecationWarning, module="airflow") |
| warnings.filterwarnings(action="default", category=PendingDeprecationWarning, module="airflow") |
| |
| _SQLITE3_VERSION_PATTERN = re2.compile(r"(?P<version>^\d+(?:\.\d+)*)\D?.*$") |
| |
| ConfigType = Union[str, int, float, bool] |
| ConfigOptionsDictType = Dict[str, ConfigType] |
| ConfigSectionSourcesType = Dict[str, Union[str, Tuple[str, str]]] |
| ConfigSourcesType = Dict[str, ConfigSectionSourcesType] |
| |
| ENV_VAR_PREFIX = "AIRFLOW__" |
| |
| |
| def _parse_sqlite_version(s: str) -> tuple[int, ...]: |
| match = _SQLITE3_VERSION_PATTERN.match(s) |
| if match is None: |
| return () |
| return tuple(int(p) for p in match.group("version").split(".")) |
| |
| |
| @overload |
| def expand_env_var(env_var: None) -> None: ... |
| |
| |
| @overload |
| def expand_env_var(env_var: str) -> str: ... |
| |
| |
| def expand_env_var(env_var: str | None) -> str | None: |
| """ |
| Expand (potentially nested) env vars. |
| |
| Repeat and apply `expandvars` and `expanduser` until |
| interpolation stops having any effect. |
| """ |
| if not env_var: |
| return env_var |
| while True: |
| interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) |
| if interpolated == env_var: |
| return interpolated |
| else: |
| env_var = interpolated |
| |
| |
| def run_command(command: str) -> str: |
| """Run command and returns stdout.""" |
| process = subprocess.Popen( |
| shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True |
| ) |
| output, stderr = (stream.decode(sys.getdefaultencoding(), "ignore") for stream in process.communicate()) |
| |
| if process.returncode != 0: |
| raise AirflowConfigException( |
| f"Cannot execute {command}. Error code is: {process.returncode}. " |
| f"Output: {output}, Stderr: {stderr}" |
| ) |
| |
| return output |
| |
| |
| def _get_config_value_from_secret_backend(config_key: str) -> str | None: |
| """Get Config option values from Secret Backend.""" |
| try: |
| secrets_client = get_custom_secret_backend() |
| if not secrets_client: |
| return None |
| return secrets_client.get_config(config_key) |
| except Exception as e: |
| raise AirflowConfigException( |
| "Cannot retrieve config from alternative secrets backend. " |
| "Make sure it is configured properly and that the Backend " |
| "is accessible.\n" |
| f"{e}" |
| ) |
| |
| |
| def _is_template(configuration_description: dict[str, dict[str, Any]], section: str, key: str) -> bool: |
| """ |
| Check if the config is a template. |
| |
| :param configuration_description: description of configuration |
| :param section: section |
| :param key: key |
| :return: True if the config is a template |
| """ |
| return configuration_description.get(section, {}).get(key, {}).get("is_template", False) |
| |
| |
| def _default_config_file_path(file_name: str) -> str: |
| templates_dir = os.path.join(os.path.dirname(__file__), "config_templates") |
| return os.path.join(templates_dir, file_name) |
| |
| |
| def retrieve_configuration_description( |
| include_airflow: bool = True, |
| include_providers: bool = True, |
| selected_provider: str | None = None, |
| ) -> dict[str, dict[str, Any]]: |
| """ |
| Read Airflow configuration description from YAML file. |
| |
| :param include_airflow: Include Airflow configs |
| :param include_providers: Include provider configs |
| :param selected_provider: If specified, include selected provider only |
| :return: Python dictionary containing configs & their info |
| """ |
| base_configuration_description: dict[str, dict[str, Any]] = {} |
| if include_airflow: |
| with open(_default_config_file_path("config.yml")) as config_file: |
| base_configuration_description.update(yaml.safe_load(config_file)) |
| if include_providers: |
| from airflow.providers_manager import ProvidersManager |
| |
| for provider, config in ProvidersManager().provider_configs: |
| if not selected_provider or provider == selected_provider: |
| base_configuration_description.update(config) |
| return base_configuration_description |
| |
| |
| class AirflowConfigParser(ConfigParser): |
| """ |
| Custom Airflow Configparser supporting defaults and deprecated options. |
| |
| This is a subclass of ConfigParser that supports defaults and deprecated options. |
| |
| The defaults are stored in the ``_default_values ConfigParser. The configuration description keeps |
| description of all the options available in Airflow (description follow config.yaml.schema). |
| |
| :param default_config: default configuration (in the form of ini file). |
| :param configuration_description: description of configuration to use |
| """ |
| |
| def __init__( |
| self, |
| default_config: str | None = None, |
| *args, |
| **kwargs, |
| ): |
| super().__init__(*args, **kwargs) |
| self.configuration_description = retrieve_configuration_description(include_providers=False) |
| self.upgraded_values = {} |
| # For those who would like to use a different data structure to keep defaults: |
| # We have to keep the default values in a ConfigParser rather than in any other |
| # data structure, because the values we have might contain %% which are ConfigParser |
| # interpolation placeholders. The _default_values config parser will interpolate them |
| # properly when we call get() on it. |
| self._default_values = create_default_config_parser(self.configuration_description) |
| self._pre_2_7_default_values = create_pre_2_7_defaults() |
| if default_config is not None: |
| self._update_defaults_from_string(default_config) |
| self._update_logging_deprecated_template_to_one_from_defaults() |
| self.is_validated = False |
| self._suppress_future_warnings = False |
| self._providers_configuration_loaded = False |
| |
| def _update_logging_deprecated_template_to_one_from_defaults(self): |
| default = self.get_default_value("logging", "log_filename_template") |
| if default: |
| # Tuple does not support item assignment, so we have to create a new tuple and replace it |
| original_replacement = self.deprecated_values["logging"]["log_filename_template"] |
| self.deprecated_values["logging"]["log_filename_template"] = ( |
| original_replacement[0], |
| default, |
| original_replacement[2], |
| ) |
| |
| def is_template(self, section: str, key) -> bool: |
| """ |
| Return whether the value is templated. |
| |
| :param section: section of the config |
| :param key: key in the section |
| :return: True if the value is templated |
| """ |
| if self.configuration_description is None: |
| return False |
| return _is_template(self.configuration_description, section, key) |
| |
| def _update_defaults_from_string(self, config_string: str): |
| """ |
| Update the defaults in _default_values based on values in config_string ("ini" format). |
| |
| Note that those values are not validated and cannot contain variables because we are using |
| regular config parser to load them. This method is used to test the config parser in unit tests. |
| |
| :param config_string: ini-formatted config string |
| """ |
| parser = ConfigParser() |
| parser.read_string(config_string) |
| for section in parser.sections(): |
| if section not in self._default_values.sections(): |
| self._default_values.add_section(section) |
| errors = False |
| for key, value in parser.items(section): |
| if not self.is_template(section, key) and "{" in value: |
| errors = True |
| log.error( |
| "The %s.%s value %s read from string contains variable. This is not supported", |
| section, |
| key, |
| value, |
| ) |
| self._default_values.set(section, key, value) |
| if errors: |
| raise AirflowConfigException( |
| f"The string config passed as default contains variables. " |
| f"This is not supported. String config: {config_string}" |
| ) |
| |
| def get_default_value(self, section: str, key: str, fallback: Any = None, raw=False, **kwargs) -> Any: |
| """ |
| Retrieve default value from default config parser. |
| |
| This will retrieve the default value from the default config parser. Optionally a raw, stored |
| value can be retrieved by setting skip_interpolation to True. This is useful for example when |
| we want to write the default value to a file, and we don't want the interpolation to happen |
| as it is going to be done later when the config is read. |
| |
| :param section: section of the config |
| :param key: key to use |
| :param fallback: fallback value to use |
| :param raw: if raw, then interpolation will be reversed |
| :param kwargs: other args |
| :return: |
| """ |
| value = self._default_values.get(section, key, fallback=fallback, **kwargs) |
| if raw and value is not None: |
| return value.replace("%", "%%") |
| return value |
| |
| def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any: |
| """Get pre 2.7 default config values.""" |
| return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs) |
| |
| # These configuration elements can be fetched as the stdout of commands |
| # following the "{section}__{name}_cmd" pattern, the idea behind this |
| # is to not store password on boxes in text files. |
| # These configs can also be fetched from Secrets backend |
| # following the "{section}__{name}__secret" pattern |
| @functools.cached_property |
| def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006 |
| if self.configuration_description is None: |
| return ( |
| _get_empty_set_for_configuration() |
| ) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯ |
| flattened = { |
| (s, k): item |
| for s, s_c in self.configuration_description.items() |
| for k, item in s_c.get("options").items() # type: ignore[union-attr] |
| } |
| sensitive = { |
| (section.lower(), key.lower()) |
| for (section, key), v in flattened.items() |
| if v.get("sensitive") is True |
| } |
| depr_option = {self.deprecated_options[x][:-1] for x in sensitive if x in self.deprecated_options} |
| depr_section = { |
| (self.deprecated_sections[s][0], k) for s, k in sensitive if s in self.deprecated_sections |
| } |
| sensitive.update(depr_section, depr_option) |
| return sensitive |
| |
| # A mapping of (new section, new option) -> (old section, old option, since_version). |
| # When reading new option, the old option will be checked to see if it exists. If it does a |
| # DeprecationWarning will be issued and the old option will be used instead |
| deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { |
| ("celery", "worker_precheck"): ("core", "worker_precheck", "2.0.0"), |
| ("logging", "interleave_timestamp_parser"): ("core", "interleave_timestamp_parser", "2.6.1"), |
| ("logging", "base_log_folder"): ("core", "base_log_folder", "2.0.0"), |
| ("logging", "remote_logging"): ("core", "remote_logging", "2.0.0"), |
| ("logging", "remote_log_conn_id"): ("core", "remote_log_conn_id", "2.0.0"), |
| ("logging", "remote_base_log_folder"): ("core", "remote_base_log_folder", "2.0.0"), |
| ("logging", "encrypt_s3_logs"): ("core", "encrypt_s3_logs", "2.0.0"), |
| ("logging", "logging_level"): ("core", "logging_level", "2.0.0"), |
| ("logging", "fab_logging_level"): ("core", "fab_logging_level", "2.0.0"), |
| ("logging", "logging_config_class"): ("core", "logging_config_class", "2.0.0"), |
| ("logging", "colored_console_log"): ("core", "colored_console_log", "2.0.0"), |
| ("logging", "colored_log_format"): ("core", "colored_log_format", "2.0.0"), |
| ("logging", "colored_formatter_class"): ("core", "colored_formatter_class", "2.0.0"), |
| ("logging", "log_format"): ("core", "log_format", "2.0.0"), |
| ("logging", "simple_log_format"): ("core", "simple_log_format", "2.0.0"), |
| ("logging", "task_log_prefix_template"): ("core", "task_log_prefix_template", "2.0.0"), |
| ("logging", "log_filename_template"): ("core", "log_filename_template", "2.0.0"), |
| ("logging", "log_processor_filename_template"): ("core", "log_processor_filename_template", "2.0.0"), |
| ("logging", "dag_processor_manager_log_location"): ( |
| "core", |
| "dag_processor_manager_log_location", |
| "2.0.0", |
| ), |
| ("logging", "task_log_reader"): ("core", "task_log_reader", "2.0.0"), |
| ("metrics", "metrics_allow_list"): ("metrics", "statsd_allow_list", "2.6.0"), |
| ("metrics", "metrics_block_list"): ("metrics", "statsd_block_list", "2.6.0"), |
| ("metrics", "statsd_on"): ("scheduler", "statsd_on", "2.0.0"), |
| ("metrics", "statsd_host"): ("scheduler", "statsd_host", "2.0.0"), |
| ("metrics", "statsd_port"): ("scheduler", "statsd_port", "2.0.0"), |
| ("metrics", "statsd_prefix"): ("scheduler", "statsd_prefix", "2.0.0"), |
| ("metrics", "statsd_allow_list"): ("scheduler", "statsd_allow_list", "2.0.0"), |
| ("metrics", "stat_name_handler"): ("scheduler", "stat_name_handler", "2.0.0"), |
| ("metrics", "statsd_datadog_enabled"): ("scheduler", "statsd_datadog_enabled", "2.0.0"), |
| ("metrics", "statsd_datadog_tags"): ("scheduler", "statsd_datadog_tags", "2.0.0"), |
| ("metrics", "statsd_datadog_metrics_tags"): ("scheduler", "statsd_datadog_metrics_tags", "2.6.0"), |
| ("metrics", "statsd_custom_client_path"): ("scheduler", "statsd_custom_client_path", "2.0.0"), |
| ("scheduler", "parsing_processes"): ("scheduler", "max_threads", "1.10.14"), |
| ("scheduler", "scheduler_idle_sleep_time"): ("scheduler", "processor_poll_interval", "2.2.0"), |
| ("operators", "default_queue"): ("celery", "default_queue", "2.1.0"), |
| ("core", "hide_sensitive_var_conn_fields"): ("admin", "hide_sensitive_variable_fields", "2.1.0"), |
| ("core", "sensitive_var_conn_names"): ("admin", "sensitive_variable_fields", "2.1.0"), |
| ("core", "default_pool_task_slot_count"): ("core", "non_pooled_task_slot_count", "1.10.4"), |
| ("core", "max_active_tasks_per_dag"): ("core", "dag_concurrency", "2.2.0"), |
| ("logging", "worker_log_server_port"): ("celery", "worker_log_server_port", "2.2.0"), |
| ("api", "access_control_allow_origins"): ("api", "access_control_allow_origin", "2.2.0"), |
| ("api", "auth_backends"): ("api", "auth_backend", "2.3.0"), |
| ("database", "sql_alchemy_conn"): ("core", "sql_alchemy_conn", "2.3.0"), |
| ("database", "sql_engine_encoding"): ("core", "sql_engine_encoding", "2.3.0"), |
| ("database", "sql_engine_collation_for_ids"): ("core", "sql_engine_collation_for_ids", "2.3.0"), |
| ("database", "sql_alchemy_pool_enabled"): ("core", "sql_alchemy_pool_enabled", "2.3.0"), |
| ("database", "sql_alchemy_pool_size"): ("core", "sql_alchemy_pool_size", "2.3.0"), |
| ("database", "sql_alchemy_max_overflow"): ("core", "sql_alchemy_max_overflow", "2.3.0"), |
| ("database", "sql_alchemy_pool_recycle"): ("core", "sql_alchemy_pool_recycle", "2.3.0"), |
| ("database", "sql_alchemy_pool_pre_ping"): ("core", "sql_alchemy_pool_pre_ping", "2.3.0"), |
| ("database", "sql_alchemy_schema"): ("core", "sql_alchemy_schema", "2.3.0"), |
| ("database", "sql_alchemy_connect_args"): ("core", "sql_alchemy_connect_args", "2.3.0"), |
| ("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"), |
| ("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"), |
| ("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"), |
| ("scheduler", "task_queued_timeout_check_interval"): ( |
| "kubernetes_executor", |
| "worker_pods_pending_timeout_check_interval", |
| "2.6.0", |
| ), |
| } |
| |
| # A mapping of new configurations to a list of old configurations for when one configuration |
| # deprecates more than one other deprecation. The deprecation logic for these configurations |
| # is defined in SchedulerJobRunner. |
| many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = { |
| ("scheduler", "task_queued_timeout"): [ |
| ("celery", "stalled_task_timeout", "2.6.0"), |
| ("celery", "task_adoption_timeout", "2.6.0"), |
| ("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"), |
| ] |
| } |
| |
| # A mapping of new section -> (old section, since_version). |
| deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")} |
| |
| # Now build the inverse so we can go from old_section/old_key to new_section/new_key |
| # if someone tries to retrieve it based on old_section/old_key |
| @functools.cached_property |
| def inversed_deprecated_options(self): |
| return {(sec, name): key for key, (sec, name, ver) in self.deprecated_options.items()} |
| |
| @functools.cached_property |
| def inversed_deprecated_sections(self): |
| return { |
| old_section: new_section for new_section, (old_section, ver) in self.deprecated_sections.items() |
| } |
| |
| # A mapping of old default values that we want to change and warn the user |
| # about. Mapping of section -> setting -> { old, replace, by_version } |
| deprecated_values: dict[str, dict[str, tuple[Pattern, str, str]]] = { |
| "core": { |
| "hostname_callable": (re2.compile(r":"), r".", "2.1"), |
| }, |
| "webserver": { |
| "navbar_color": (re2.compile(r"(?i)\A#007A87\z"), "#fff", "2.1"), |
| "dag_default_view": (re2.compile(r"^tree$"), "grid", "3.0"), |
| }, |
| "email": { |
| "email_backend": ( |
| re2.compile(r"^airflow\.contrib\.utils\.sendgrid\.send_email$"), |
| r"airflow.providers.sendgrid.utils.emailer.send_email", |
| "2.1", |
| ), |
| }, |
| "logging": { |
| "log_filename_template": ( |
| re2.compile(re2.escape("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")), |
| # The actual replacement value will be updated after defaults are loaded from config.yml |
| "XX-set-after-default-config-loaded-XX", |
| "3.0", |
| ), |
| }, |
| "api": { |
| "auth_backends": ( |
| re2.compile(r"^airflow\.api\.auth\.backend\.deny_all$|^$"), |
| "airflow.api.auth.backend.session", |
| "3.0", |
| ), |
| }, |
| "elasticsearch": { |
| "log_id_template": ( |
| re2.compile("^" + re2.escape("{dag_id}-{task_id}-{execution_date}-{try_number}") + "$"), |
| "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}", |
| "3.0", |
| ) |
| }, |
| } |
| |
| _available_logging_levels = ["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"] |
| enums_options = { |
| ("core", "default_task_weight_rule"): sorted(WeightRule.all_weight_rules()), |
| ("core", "dag_ignore_file_syntax"): ["regexp", "glob"], |
| ("core", "mp_start_method"): multiprocessing.get_all_start_methods(), |
| ("scheduler", "file_parsing_sort_mode"): ["modified_time", "random_seeded_by_host", "alphabetical"], |
| ("logging", "logging_level"): _available_logging_levels, |
| ("logging", "fab_logging_level"): _available_logging_levels, |
| # celery_logging_level can be empty, which uses logging_level as fallback |
| ("logging", "celery_logging_level"): [*_available_logging_levels, ""], |
| ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", "matomo", ""], |
| } |
| |
| upgraded_values: dict[tuple[str, str], str] |
| """Mapping of (section,option) to the old value that was upgraded""" |
| |
| def get_sections_including_defaults(self) -> list[str]: |
| """ |
| Retrieve all sections from the configuration parser, including sections defined by built-in defaults. |
| |
| :return: list of section names |
| """ |
| return list(dict.fromkeys(itertools.chain(self.configuration_description, self.sections()))) |
| |
| def get_options_including_defaults(self, section: str) -> list[str]: |
| """ |
| Retrieve all possible option from the configuration parser for the section given. |
| |
| Includes options defined by built-in defaults. |
| |
| :return: list of option names for the section given |
| """ |
| my_own_options = self.options(section) if self.has_section(section) else [] |
| all_options_from_defaults = self.configuration_description.get(section, {}).get("options", {}) |
| return list(dict.fromkeys(itertools.chain(all_options_from_defaults, my_own_options))) |
| |
| def optionxform(self, optionstr: str) -> str: |
| """ |
| Transform option names on every read, get, or set operation. |
| |
| This changes from the default behaviour of ConfigParser from lower-casing |
| to instead be case-preserving. |
| |
| :param optionstr: |
| :return: |
| """ |
| return optionstr |
| |
| @contextmanager |
| def make_sure_configuration_loaded(self, with_providers: bool) -> Generator[None, None, None]: |
| """ |
| Make sure configuration is loaded with or without providers. |
| |
| This happens regardless if the provider configuration has been loaded before or not. |
| Restores configuration to the state before entering the context. |
| |
| :param with_providers: whether providers should be loaded |
| """ |
| reload_providers_when_leaving = False |
| if with_providers and not self._providers_configuration_loaded: |
| # make sure providers are initialized |
| from airflow.providers_manager import ProvidersManager |
| |
| # run internal method to initialize providers configuration in ordered to not trigger the |
| # initialize_providers_configuration cache (because we will be unloading it now |
| ProvidersManager()._initialize_providers_configuration() |
| elif not with_providers and self._providers_configuration_loaded: |
| reload_providers_when_leaving = True |
| self.restore_core_default_configuration() |
| yield |
| if reload_providers_when_leaving: |
| self.load_providers_configuration() |
| |
| @staticmethod |
| def _write_section_header( |
| file: IO[str], |
| include_descriptions: bool, |
| section_config_description: dict[str, str], |
| section_to_write: str, |
| ) -> None: |
| """Write header for configuration section.""" |
| file.write(f"[{section_to_write}]\n") |
| section_description = section_config_description.get("description") |
| if section_description and include_descriptions: |
| for line in section_description.splitlines(): |
| file.write(f"# {line}\n") |
| file.write("\n") |
| |
| def _write_option_header( |
| self, |
| file: IO[str], |
| option: str, |
| extra_spacing: bool, |
| include_descriptions: bool, |
| include_env_vars: bool, |
| include_examples: bool, |
| include_sources: bool, |
| section_config_description: dict[str, dict[str, Any]], |
| section_to_write: str, |
| sources_dict: ConfigSourcesType, |
| ) -> tuple[bool, bool]: |
| """ |
| Write header for configuration option. |
| |
| Returns tuple of (should_continue, needs_separation) where needs_separation should be |
| set if the option needs additional separation to visually separate it from the next option. |
| """ |
| from airflow import __version__ as airflow_version |
| |
| option_config_description = ( |
| section_config_description.get("options", {}).get(option, {}) |
| if section_config_description |
| else {} |
| ) |
| version_added = option_config_description.get("version_added") |
| if version_added is not None and parse_version(version_added) > parse_version( |
| parse_version(airflow_version).base_version |
| ): |
| # skip if option is going to be added in the future version |
| return False, False |
| description = option_config_description.get("description") |
| needs_separation = False |
| if description and include_descriptions: |
| for line in description.splitlines(): |
| file.write(f"# {line}\n") |
| needs_separation = True |
| example = option_config_description.get("example") |
| if example is not None and include_examples: |
| if extra_spacing: |
| file.write("#\n") |
| file.write(f"# Example: {option} = {example}\n") |
| needs_separation = True |
| if include_sources and sources_dict: |
| sources_section = sources_dict.get(section_to_write) |
| value_with_source = sources_section.get(option) if sources_section else None |
| if value_with_source is None: |
| file.write("#\n# Source: not defined\n") |
| else: |
| file.write(f"#\n# Source: {value_with_source[1]}\n") |
| needs_separation = True |
| if include_env_vars: |
| file.write(f"#\n# Variable: AIRFLOW__{section_to_write.upper()}__{option.upper()}\n") |
| if extra_spacing: |
| file.write("#\n") |
| needs_separation = True |
| return True, needs_separation |
| |
| def _write_value( |
| self, |
| file: IO[str], |
| option: str, |
| comment_out_everything: bool, |
| needs_separation: bool, |
| only_defaults: bool, |
| section_to_write: str, |
| ): |
| if self._default_values is None: |
| default_value = None |
| else: |
| default_value = self.get_default_value(section_to_write, option, raw=True) |
| if only_defaults: |
| value = default_value |
| else: |
| value = self.get(section_to_write, option, fallback=default_value, raw=True) |
| if value is None: |
| file.write(f"# {option} = \n") |
| else: |
| if comment_out_everything: |
| file.write(f"# {option} = {value}\n") |
| else: |
| file.write(f"{option} = {value}\n") |
| if needs_separation: |
| file.write("\n") |
| |
| def write( # type: ignore[override] |
| self, |
| file: IO[str], |
| section: str | None = None, |
| include_examples: bool = True, |
| include_descriptions: bool = True, |
| include_sources: bool = True, |
| include_env_vars: bool = True, |
| include_providers: bool = True, |
| comment_out_everything: bool = False, |
| hide_sensitive_values: bool = False, |
| extra_spacing: bool = True, |
| only_defaults: bool = False, |
| **kwargs: Any, |
| ) -> None: |
| """ |
| Write configuration with comments and examples to a file. |
| |
| :param file: file to write to |
| :param section: section of the config to write, defaults to all sections |
| :param include_examples: Include examples in the output |
| :param include_descriptions: Include descriptions in the output |
| :param include_sources: Include the source of each config option |
| :param include_env_vars: Include environment variables corresponding to each config option |
| :param include_providers: Include providers configuration |
| :param comment_out_everything: Comment out all values |
| :param hide_sensitive_values: Include sensitive values in the output |
| :param extra_spacing: Add extra spacing before examples and after variables |
| :param only_defaults: Only include default values when writing the config, not the actual values |
| """ |
| sources_dict = {} |
| if include_sources: |
| sources_dict = self.as_dict(display_source=True) |
| if self._default_values is None: |
| raise RuntimeError("Cannot write default config, no default config set") |
| if self.configuration_description is None: |
| raise RuntimeError("Cannot write default config, no default configuration description set") |
| with self.make_sure_configuration_loaded(with_providers=include_providers): |
| for section_to_write in self.get_sections_including_defaults(): |
| section_config_description = self.configuration_description.get(section_to_write, {}) |
| if section_to_write != section and section is not None: |
| continue |
| if self._default_values.has_section(section_to_write) or self.has_section(section_to_write): |
| self._write_section_header( |
| file, include_descriptions, section_config_description, section_to_write |
| ) |
| for option in self.get_options_including_defaults(section_to_write): |
| should_continue, needs_separation = self._write_option_header( |
| file=file, |
| option=option, |
| extra_spacing=extra_spacing, |
| include_descriptions=include_descriptions, |
| include_env_vars=include_env_vars, |
| include_examples=include_examples, |
| include_sources=include_sources, |
| section_config_description=section_config_description, |
| section_to_write=section_to_write, |
| sources_dict=sources_dict, |
| ) |
| self._write_value( |
| file=file, |
| option=option, |
| comment_out_everything=comment_out_everything, |
| needs_separation=needs_separation, |
| only_defaults=only_defaults, |
| section_to_write=section_to_write, |
| ) |
| if include_descriptions and not needs_separation: |
| # extra separation between sections in case last option did not need it |
| file.write("\n") |
| |
| def restore_core_default_configuration(self) -> None: |
| """Restore default configuration for core Airflow. |
| |
| It does not restore configuration for providers. If you want to restore configuration for |
| providers, you need to call ``load_providers_configuration`` method. |
| """ |
| self.configuration_description = retrieve_configuration_description(include_providers=False) |
| self._default_values = create_default_config_parser(self.configuration_description) |
| self._providers_configuration_loaded = False |
| |
| def validate(self): |
| self._validate_sqlite3_version() |
| self._validate_enums() |
| |
| for section, replacement in self.deprecated_values.items(): |
| for name, info in replacement.items(): |
| old, new, version = info |
| current_value = self.get(section, name, fallback="") |
| if self._using_old_value(old, current_value): |
| self.upgraded_values[(section, name)] = current_value |
| new_value = old.sub(new, current_value) |
| self._update_env_var(section=section, name=name, new_value=new_value) |
| self._create_future_warning( |
| name=name, |
| section=section, |
| current_value=current_value, |
| new_value=new_value, |
| version=version, |
| ) |
| |
| self._upgrade_auth_backends() |
| self._upgrade_postgres_metastore_conn() |
| self.is_validated = True |
| |
| def _upgrade_auth_backends(self): |
| """ |
| Ensure a custom auth_backends setting contains session. |
| |
| This is required by the UI for ajax queries. |
| """ |
| old_value = self.get("api", "auth_backends", fallback="") |
| if old_value in ("airflow.api.auth.backend.default", ""): |
| # handled by deprecated_values |
| pass |
| elif old_value.find("airflow.api.auth.backend.session") == -1: |
| new_value = old_value + ",airflow.api.auth.backend.session" |
| self._update_env_var(section="api", name="auth_backends", new_value=new_value) |
| self.upgraded_values[("api", "auth_backends")] = old_value |
| |
| # if the old value is set via env var, we need to wipe it |
| # otherwise, it'll "win" over our adjusted value |
| old_env_var = self._env_var_name("api", "auth_backend") |
| os.environ.pop(old_env_var, None) |
| |
| warnings.warn( |
| "The auth_backends setting in [api] has had airflow.api.auth.backend.session added " |
| "in the running config, which is needed by the UI. Please update your config before " |
| "Apache Airflow 3.0.", |
| FutureWarning, |
| stacklevel=1, |
| ) |
| |
| def _upgrade_postgres_metastore_conn(self): |
| """ |
| Upgrade SQL schemas. |
| |
| As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` |
| must be replaced with `postgresql`. |
| """ |
| section, key = "database", "sql_alchemy_conn" |
| old_value = self.get(section, key, _extra_stacklevel=1) |
| bad_schemes = ["postgres+psycopg2", "postgres"] |
| good_scheme = "postgresql" |
| parsed = urlsplit(old_value) |
| if parsed.scheme in bad_schemes: |
| warnings.warn( |
| f"Bad scheme in Airflow configuration core > sql_alchemy_conn: `{parsed.scheme}`. " |
| "As of SQLAlchemy 1.4 (adopted in Airflow 2.3) this is no longer supported. You must " |
| f"change to `{good_scheme}` before the next Airflow release.", |
| FutureWarning, |
| stacklevel=1, |
| ) |
| self.upgraded_values[(section, key)] = old_value |
| new_value = re2.sub("^" + re2.escape(f"{parsed.scheme}://"), f"{good_scheme}://", old_value) |
| self._update_env_var(section=section, name=key, new_value=new_value) |
| |
| # if the old value is set via env var, we need to wipe it |
| # otherwise, it'll "win" over our adjusted value |
| old_env_var = self._env_var_name("core", key) |
| os.environ.pop(old_env_var, None) |
| |
| def _validate_enums(self): |
| """Validate that enum type config has an accepted value.""" |
| for (section_key, option_key), enum_options in self.enums_options.items(): |
| if self.has_option(section_key, option_key): |
| value = self.get(section_key, option_key, fallback=None) |
| if value and value not in enum_options: |
| raise AirflowConfigException( |
| f"`[{section_key}] {option_key}` should not be " |
| f"{value!r}. Possible values: {', '.join(enum_options)}." |
| ) |
| |
| def _validate_sqlite3_version(self): |
| """Validate SQLite version. |
| |
| Some features in storing rendered fields require SQLite >= 3.15.0. |
| """ |
| if "sqlite" not in self.get("database", "sql_alchemy_conn"): |
| return |
| |
| import sqlite3 |
| |
| min_sqlite_version = (3, 15, 0) |
| if _parse_sqlite_version(sqlite3.sqlite_version) >= min_sqlite_version: |
| return |
| |
| from airflow.utils.docs import get_docs_url |
| |
| min_sqlite_version_str = ".".join(str(s) for s in min_sqlite_version) |
| raise AirflowConfigException( |
| f"error: SQLite C library too old (< {min_sqlite_version_str}). " |
| f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" |
| ) |
| |
| def _using_old_value(self, old: Pattern, current_value: str) -> bool: |
| return old.search(current_value) is not None |
| |
| def _update_env_var(self, section: str, name: str, new_value: str): |
| env_var = self._env_var_name(section, name) |
| # Set it as an env var so that any subprocesses keep the same override! |
| os.environ[env_var] = new_value |
| |
| @staticmethod |
| def _create_future_warning(name: str, section: str, current_value: Any, new_value: Any, version: str): |
| warnings.warn( |
| f"The {name!r} setting in [{section}] has the old default value of {current_value!r}. " |
| f"This value has been changed to {new_value!r} in the running config, but " |
| f"please update your config before Apache Airflow {version}.", |
| FutureWarning, |
| stacklevel=3, |
| ) |
| |
| def _env_var_name(self, section: str, key: str) -> str: |
| return f"{ENV_VAR_PREFIX}{section.replace('.', '_').upper()}__{key.upper()}" |
| |
| def _get_env_var_option(self, section: str, key: str): |
| # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore) |
| env_var = self._env_var_name(section, key) |
| if env_var in os.environ: |
| return expand_env_var(os.environ[env_var]) |
| # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command) |
| env_var_cmd = env_var + "_CMD" |
| if env_var_cmd in os.environ: |
| # if this is a valid command key... |
| if (section, key) in self.sensitive_config_values: |
| return run_command(os.environ[env_var_cmd]) |
| # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend) |
| env_var_secret_path = env_var + "_SECRET" |
| if env_var_secret_path in os.environ: |
| # if this is a valid secret path... |
| if (section, key) in self.sensitive_config_values: |
| return _get_config_value_from_secret_backend(os.environ[env_var_secret_path]) |
| return None |
| |
| def _get_cmd_option(self, section: str, key: str): |
| fallback_key = key + "_cmd" |
| if (section, key) in self.sensitive_config_values: |
| if super().has_option(section, fallback_key): |
| command = super().get(section, fallback_key) |
| return run_command(command) |
| return None |
| |
| def _get_cmd_option_from_config_sources( |
| self, config_sources: ConfigSourcesType, section: str, key: str |
| ) -> str | None: |
| fallback_key = key + "_cmd" |
| if (section, key) in self.sensitive_config_values: |
| section_dict = config_sources.get(section) |
| if section_dict is not None: |
| command_value = section_dict.get(fallback_key) |
| if command_value is not None: |
| if isinstance(command_value, str): |
| command = command_value |
| else: |
| command = command_value[0] |
| return run_command(command) |
| return None |
| |
| def _get_secret_option(self, section: str, key: str) -> str | None: |
| """Get Config option values from Secret Backend.""" |
| fallback_key = key + "_secret" |
| if (section, key) in self.sensitive_config_values: |
| if super().has_option(section, fallback_key): |
| secrets_path = super().get(section, fallback_key) |
| return _get_config_value_from_secret_backend(secrets_path) |
| return None |
| |
| def _get_secret_option_from_config_sources( |
| self, config_sources: ConfigSourcesType, section: str, key: str |
| ) -> str | None: |
| fallback_key = key + "_secret" |
| if (section, key) in self.sensitive_config_values: |
| section_dict = config_sources.get(section) |
| if section_dict is not None: |
| secrets_path_value = section_dict.get(fallback_key) |
| if secrets_path_value is not None: |
| if isinstance(secrets_path_value, str): |
| secrets_path = secrets_path_value |
| else: |
| secrets_path = secrets_path_value[0] |
| return _get_config_value_from_secret_backend(secrets_path) |
| return None |
| |
| def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: |
| value = self.get(section, key, _extra_stacklevel=1, **kwargs) |
| if value is None: |
| raise ValueError(f"The value {section}/{key} should be set!") |
| return value |
| |
| def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]: |
| value = self.getlist(section, key, **kwargs) |
| if value is None: |
| raise ValueError(f"The value {section}/{key} should be set!") |
| return value |
| |
| @overload # type: ignore[override] |
| def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... |
| |
| @overload # type: ignore[override] |
| def get(self, section: str, key: str, **kwargs) -> str | None: ... |
| |
| def get( # type: ignore[override,misc] |
| self, |
| section: str, |
| key: str, |
| suppress_warnings: bool = False, |
| _extra_stacklevel: int = 0, |
| **kwargs, |
| ) -> str | None: |
| section = section.lower() |
| key = key.lower() |
| warning_emitted = False |
| deprecated_section: str | None |
| deprecated_key: str | None |
| |
| option_description = self.configuration_description.get(section, {}).get(key, {}) |
| if option_description.get("deprecated"): |
| deprecation_reason = option_description.get("deprecation_reason", "") |
| warnings.warn( |
| f"The '{key}' option in section {section} is deprecated. {deprecation_reason}", |
| DeprecationWarning, |
| stacklevel=2 + _extra_stacklevel, |
| ) |
| # For when we rename whole sections |
| if section in self.inversed_deprecated_sections: |
| deprecated_section, deprecated_key = (section, key) |
| section = self.inversed_deprecated_sections[section] |
| if not self._suppress_future_warnings: |
| warnings.warn( |
| f"The config section [{deprecated_section}] has been renamed to " |
| f"[{section}]. Please update your `conf.get*` call to use the new name", |
| FutureWarning, |
| stacklevel=2 + _extra_stacklevel, |
| ) |
| # Don't warn about individual rename if the whole section is renamed |
| warning_emitted = True |
| elif (section, key) in self.inversed_deprecated_options: |
| # Handle using deprecated section/key instead of the new section/key |
| new_section, new_key = self.inversed_deprecated_options[(section, key)] |
| if not self._suppress_future_warnings and not warning_emitted: |
| warnings.warn( |
| f"section/key [{section}/{key}] has been deprecated, you should use" |
| f"[{new_section}/{new_key}] instead. Please update your `conf.get*` call to use the " |
| "new name", |
| FutureWarning, |
| stacklevel=2 + _extra_stacklevel, |
| ) |
| warning_emitted = True |
| deprecated_section, deprecated_key = section, key |
| section, key = (new_section, new_key) |
| elif section in self.deprecated_sections: |
| # When accessing the new section name, make sure we check under the old config name |
| deprecated_key = key |
| deprecated_section = self.deprecated_sections[section][0] |
| else: |
| deprecated_section, deprecated_key, _ = self.deprecated_options.get( |
| (section, key), (None, None, None) |
| ) |
| # first check environment variables |
| option = self._get_environment_variables( |
| deprecated_key, |
| deprecated_section, |
| key, |
| section, |
| issue_warning=not warning_emitted, |
| extra_stacklevel=_extra_stacklevel, |
| ) |
| if option is not None: |
| return option |
| |
| # ...then the config file |
| option = self._get_option_from_config_file( |
| deprecated_key, |
| deprecated_section, |
| key, |
| kwargs, |
| section, |
| issue_warning=not warning_emitted, |
| extra_stacklevel=_extra_stacklevel, |
| ) |
| if option is not None: |
| return option |
| |
| # ...then commands |
| option = self._get_option_from_commands( |
| deprecated_key, |
| deprecated_section, |
| key, |
| section, |
| issue_warning=not warning_emitted, |
| extra_stacklevel=_extra_stacklevel, |
| ) |
| if option is not None: |
| return option |
| |
| # ...then from secret backends |
| option = self._get_option_from_secrets( |
| deprecated_key, |
| deprecated_section, |
| key, |
| section, |
| issue_warning=not warning_emitted, |
| extra_stacklevel=_extra_stacklevel, |
| ) |
| if option is not None: |
| return option |
| |
| # ...then the default config |
| if self.get_default_value(section, key) is not None or "fallback" in kwargs: |
| return expand_env_var(self.get_default_value(section, key, **kwargs)) |
| |
| if self.get_default_pre_2_7_value(section, key) is not None: |
| # no expansion needed |
| return self.get_default_pre_2_7_value(section, key, **kwargs) |
| |
| if not suppress_warnings: |
| log.warning("section/key [%s/%s] not found in config", section, key) |
| |
| raise AirflowConfigException(f"section/key [{section}/{key}] not found in config") |
| |
| def _get_option_from_secrets( |
| self, |
| deprecated_key: str | None, |
| deprecated_section: str | None, |
| key: str, |
| section: str, |
| issue_warning: bool = True, |
| extra_stacklevel: int = 0, |
| ) -> str | None: |
| option = self._get_secret_option(section, key) |
| if option: |
| return option |
| if deprecated_section and deprecated_key: |
| with self.suppress_future_warnings(): |
| option = self._get_secret_option(deprecated_section, deprecated_key) |
| if option: |
| if issue_warning: |
| self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) |
| return option |
| return None |
| |
| def _get_option_from_commands( |
| self, |
| deprecated_key: str | None, |
| deprecated_section: str | None, |
| key: str, |
| section: str, |
| issue_warning: bool = True, |
| extra_stacklevel: int = 0, |
| ) -> str | None: |
| option = self._get_cmd_option(section, key) |
| if option: |
| return option |
| if deprecated_section and deprecated_key: |
| with self.suppress_future_warnings(): |
| option = self._get_cmd_option(deprecated_section, deprecated_key) |
| if option: |
| if issue_warning: |
| self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) |
| return option |
| return None |
| |
| def _get_option_from_config_file( |
| self, |
| deprecated_key: str | None, |
| deprecated_section: str | None, |
| key: str, |
| kwargs: dict[str, Any], |
| section: str, |
| issue_warning: bool = True, |
| extra_stacklevel: int = 0, |
| ) -> str | None: |
| if super().has_option(section, key): |
| # Use the parent's methods to get the actual config here to be able to |
| # separate the config from default config. |
| return expand_env_var(super().get(section, key, **kwargs)) |
| if deprecated_section and deprecated_key: |
| if super().has_option(deprecated_section, deprecated_key): |
| if issue_warning: |
| self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) |
| with self.suppress_future_warnings(): |
| return expand_env_var(super().get(deprecated_section, deprecated_key, **kwargs)) |
| return None |
| |
| def _get_environment_variables( |
| self, |
| deprecated_key: str | None, |
| deprecated_section: str | None, |
| key: str, |
| section: str, |
| issue_warning: bool = True, |
| extra_stacklevel: int = 0, |
| ) -> str | None: |
| option = self._get_env_var_option(section, key) |
| if option is not None: |
| return option |
| if deprecated_section and deprecated_key: |
| with self.suppress_future_warnings(): |
| option = self._get_env_var_option(deprecated_section, deprecated_key) |
| if option is not None: |
| if issue_warning: |
| self._warn_deprecate(section, key, deprecated_section, deprecated_key, extra_stacklevel) |
| return option |
| return None |
| |
| def getboolean(self, section: str, key: str, **kwargs) -> bool: # type: ignore[override] |
| val = str(self.get(section, key, _extra_stacklevel=1, **kwargs)).lower().strip() |
| if "#" in val: |
| val = val.split("#")[0].strip() |
| if val in ("t", "true", "1"): |
| return True |
| elif val in ("f", "false", "0"): |
| return False |
| else: |
| raise AirflowConfigException( |
| f'Failed to convert value to bool. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{val}".' |
| ) |
| |
| def getint(self, section: str, key: str, **kwargs) -> int: # type: ignore[override] |
| val = self.get(section, key, _extra_stacklevel=1, **kwargs) |
| if val is None: |
| raise AirflowConfigException( |
| f"Failed to convert value None to int. " |
| f'Please check "{key}" key in "{section}" section is set.' |
| ) |
| try: |
| return int(val) |
| except ValueError: |
| raise AirflowConfigException( |
| f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{val}".' |
| ) |
| |
| def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[override] |
| val = self.get(section, key, _extra_stacklevel=1, **kwargs) |
| if val is None: |
| raise AirflowConfigException( |
| f"Failed to convert value None to float. " |
| f'Please check "{key}" key in "{section}" section is set.' |
| ) |
| try: |
| return float(val) |
| except ValueError: |
| raise AirflowConfigException( |
| f'Failed to convert value to float. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{val}".' |
| ) |
| |
| def getlist(self, section: str, key: str, delimiter=",", **kwargs): |
| val = self.get(section, key, **kwargs) |
| if val is None: |
| raise AirflowConfigException( |
| f"Failed to convert value None to list. " |
| f'Please check "{key}" key in "{section}" section is set.' |
| ) |
| try: |
| return [item.strip() for item in val.split(delimiter)] |
| except Exception: |
| raise AirflowConfigException( |
| f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{val}".' |
| ) |
| |
| def getimport(self, section: str, key: str, **kwargs) -> Any: |
| """ |
| Read options, import the full qualified name, and return the object. |
| |
| In case of failure, it throws an exception with the key and section names |
| |
| :return: The object or None, if the option is empty |
| """ |
| full_qualified_path = conf.get(section=section, key=key, **kwargs) |
| if not full_qualified_path: |
| return None |
| |
| try: |
| return import_string(full_qualified_path) |
| except ImportError as e: |
| log.warning(e) |
| raise AirflowConfigException( |
| f'The object could not be loaded. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{full_qualified_path}".' |
| ) |
| |
| def getjson( |
| self, section: str, key: str, fallback=None, **kwargs |
| ) -> dict | list | str | int | float | None: |
| """ |
| Return a config value parsed from a JSON string. |
| |
| ``fallback`` is *not* JSON parsed but used verbatim when no config value is given. |
| """ |
| try: |
| data = self.get(section=section, key=key, fallback=None, _extra_stacklevel=1, **kwargs) |
| except (NoSectionError, NoOptionError): |
| data = None |
| |
| if data is None or data == "": |
| return fallback |
| |
| try: |
| return json.loads(data) |
| except JSONDecodeError as e: |
| raise AirflowConfigException(f"Unable to parse [{section}] {key!r} as valid json") from e |
| |
| def gettimedelta( |
| self, section: str, key: str, fallback: Any = None, **kwargs |
| ) -> datetime.timedelta | None: |
| """ |
| Get the config value for the given section and key, and convert it into datetime.timedelta object. |
| |
| If the key is missing, then it is considered as `None`. |
| |
| :param section: the section from the config |
| :param key: the key defined in the given section |
| :param fallback: fallback value when no config value is given, defaults to None |
| :raises AirflowConfigException: raised because ValueError or OverflowError |
| :return: datetime.timedelta(seconds=<config_value>) or None |
| """ |
| val = self.get(section, key, fallback=fallback, _extra_stacklevel=1, **kwargs) |
| |
| if val: |
| # the given value must be convertible to integer |
| try: |
| int_val = int(val) |
| except ValueError: |
| raise AirflowConfigException( |
| f'Failed to convert value to int. Please check "{key}" key in "{section}" section. ' |
| f'Current value: "{val}".' |
| ) |
| |
| try: |
| return datetime.timedelta(seconds=int_val) |
| except OverflowError as err: |
| raise AirflowConfigException( |
| f"Failed to convert value to timedelta in `seconds`. " |
| f"{err}. " |
| f'Please check "{key}" key in "{section}" section. Current value: "{val}".' |
| ) |
| |
| return fallback |
| |
| def read( |
| self, |
| filenames: (str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike]), |
| encoding=None, |
| ): |
| super().read(filenames=filenames, encoding=encoding) |
| |
| def read_dict( # type: ignore[override] |
| self, dictionary: dict[str, dict[str, Any]], source: str = "<dict>" |
| ): |
| """ |
| We define a different signature here to add better type hints and checking. |
| |
| :param dictionary: dictionary to read from |
| :param source: source to be used to store the configuration |
| :return: |
| """ |
| super().read_dict(dictionary=dictionary, source=source) |
| |
| def has_option(self, section: str, option: str) -> bool: |
| """ |
| Check if option is defined. |
| |
| Uses self.get() to avoid reimplementing the priority order of config variables |
| (env, config, cmd, defaults). |
| |
| :param section: section to get option from |
| :param option: option to get |
| :return: |
| """ |
| try: |
| value = self.get(section, option, fallback=None, _extra_stacklevel=1, suppress_warnings=True) |
| if value is None: |
| return False |
| return True |
| except (NoOptionError, NoSectionError): |
| return False |
| |
| def set(self, section: str, option: str, value: str | None = None) -> None: |
| """ |
| Set an option to the given value. |
| |
| This override just makes sure the section and option are lower case, to match what we do in `get`. |
| """ |
| section = section.lower() |
| option = option.lower() |
| super().set(section, option, value) |
| |
| def remove_option(self, section: str, option: str, remove_default: bool = True): |
| """ |
| Remove an option if it exists in config from a file or default config. |
| |
| If both of config have the same option, this removes the option |
| in both configs unless remove_default=False. |
| """ |
| section = section.lower() |
| option = option.lower() |
| if super().has_option(section, option): |
| super().remove_option(section, option) |
| |
| if self.get_default_value(section, option) is not None and remove_default: |
| self._default_values.remove_option(section, option) |
| |
| def getsection(self, section: str) -> ConfigOptionsDictType | None: |
| """ |
| Return the section as a dict. |
| |
| Values are converted to int, float, bool as required. |
| |
| :param section: section from the config |
| """ |
| if not self.has_section(section) and not self._default_values.has_section(section): |
| return None |
| if self._default_values.has_section(section): |
| _section: ConfigOptionsDictType = dict(self._default_values.items(section)) |
| else: |
| _section = {} |
| |
| if self.has_section(section): |
| _section.update(self.items(section)) |
| |
| section_prefix = self._env_var_name(section, "") |
| for env_var in sorted(os.environ.keys()): |
| if env_var.startswith(section_prefix): |
| key = env_var.replace(section_prefix, "") |
| if key.endswith("_CMD"): |
| key = key[:-4] |
| key = key.lower() |
| _section[key] = self._get_env_var_option(section, key) |
| |
| for key, val in _section.items(): |
| if val is None: |
| raise AirflowConfigException( |
| f"Failed to convert value automatically. " |
| f'Please check "{key}" key in "{section}" section is set.' |
| ) |
| try: |
| _section[key] = int(val) |
| except ValueError: |
| try: |
| _section[key] = float(val) |
| except ValueError: |
| if isinstance(val, str) and val.lower() in ("t", "true"): |
| _section[key] = True |
| elif isinstance(val, str) and val.lower() in ("f", "false"): |
| _section[key] = False |
| return _section |
| |
| def as_dict( |
| self, |
| display_source: bool = False, |
| display_sensitive: bool = False, |
| raw: bool = False, |
| include_env: bool = True, |
| include_cmds: bool = True, |
| include_secret: bool = True, |
| ) -> ConfigSourcesType: |
| """ |
| Return the current configuration as an OrderedDict of OrderedDicts. |
| |
| When materializing current configuration Airflow defaults are |
| materialized along with user set configs. If any of the `include_*` |
| options are False then the result of calling command or secret key |
| configs do not override Airflow defaults and instead are passed through. |
| In order to then avoid Airflow defaults from overwriting user set |
| command or secret key configs we filter out bare sensitive_config_values |
| that are set to Airflow defaults when command or secret key configs |
| produce different values. |
| |
| :param display_source: If False, the option value is returned. If True, |
| a tuple of (option_value, source) is returned. Source is either |
| 'airflow.cfg', 'default', 'env var', or 'cmd'. |
| :param display_sensitive: If True, the values of options set by env |
| vars and bash commands will be displayed. If False, those options |
| are shown as '< hidden >' |
| :param raw: Should the values be output as interpolated values, or the |
| "raw" form that can be fed back in to ConfigParser |
| :param include_env: Should the value of configuration from AIRFLOW__ |
| environment variables be included or not |
| :param include_cmds: Should the result of calling any *_cmd config be |
| set (True, default), or should the _cmd options be left as the |
| command to run (False) |
| :param include_secret: Should the result of calling any *_secret config be |
| set (True, default), or should the _secret options be left as the |
| path to get the secret from (False) |
| :return: Dictionary, where the key is the name of the section and the content is |
| the dictionary with the name of the parameter and its value. |
| """ |
| if not display_sensitive: |
| # We want to hide the sensitive values at the appropriate methods |
| # since envs from cmds, secrets can be read at _include_envs method |
| if not all([include_env, include_cmds, include_secret]): |
| raise ValueError( |
| "If display_sensitive is false, then include_env, " |
| "include_cmds, include_secret must all be set as True" |
| ) |
| |
| config_sources: ConfigSourcesType = {} |
| |
| # We check sequentially all those sources and the last one we saw it in will "win" |
| configs: Iterable[tuple[str, ConfigParser]] = [ |
| ("default-pre-2-7", self._pre_2_7_default_values), |
| ("default", self._default_values), |
| ("airflow.cfg", self), |
| ] |
| |
| self._replace_config_with_display_sources( |
| config_sources, |
| configs, |
| self.configuration_description if self.configuration_description else {}, |
| display_source, |
| raw, |
| self.deprecated_options, |
| include_cmds=include_cmds, |
| include_env=include_env, |
| include_secret=include_secret, |
| ) |
| |
| # add env vars and overwrite because they have priority |
| if include_env: |
| self._include_envs(config_sources, display_sensitive, display_source, raw) |
| else: |
| self._filter_by_source(config_sources, display_source, self._get_env_var_option) |
| |
| # add bash commands |
| if include_cmds: |
| self._include_commands(config_sources, display_sensitive, display_source, raw) |
| else: |
| self._filter_by_source(config_sources, display_source, self._get_cmd_option) |
| |
| # add config from secret backends |
| if include_secret: |
| self._include_secrets(config_sources, display_sensitive, display_source, raw) |
| else: |
| self._filter_by_source(config_sources, display_source, self._get_secret_option) |
| |
| if not display_sensitive: |
| # This ensures the ones from config file is hidden too |
| # if they are not provided through env, cmd and secret |
| hidden = "< hidden >" |
| for section, key in self.sensitive_config_values: |
| if config_sources.get(section): |
| if config_sources[section].get(key, None): |
| if display_source: |
| source = config_sources[section][key][1] |
| config_sources[section][key] = (hidden, source) |
| else: |
| config_sources[section][key] = hidden |
| |
| return config_sources |
| |
| def _include_secrets( |
| self, |
| config_sources: ConfigSourcesType, |
| display_sensitive: bool, |
| display_source: bool, |
| raw: bool, |
| ): |
| for section, key in self.sensitive_config_values: |
| value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key) |
| if value: |
| if not display_sensitive: |
| value = "< hidden >" |
| if display_source: |
| opt: str | tuple[str, str] = (value, "secret") |
| elif raw: |
| opt = value.replace("%", "%%") |
| else: |
| opt = value |
| config_sources.setdefault(section, {}).update({key: opt}) |
| del config_sources[section][key + "_secret"] |
| |
| def _include_commands( |
| self, |
| config_sources: ConfigSourcesType, |
| display_sensitive: bool, |
| display_source: bool, |
| raw: bool, |
| ): |
| for section, key in self.sensitive_config_values: |
| opt = self._get_cmd_option_from_config_sources(config_sources, section, key) |
| if not opt: |
| continue |
| opt_to_set: str | tuple[str, str] | None = opt |
| if not display_sensitive: |
| opt_to_set = "< hidden >" |
| if display_source: |
| opt_to_set = (str(opt_to_set), "cmd") |
| elif raw: |
| opt_to_set = str(opt_to_set).replace("%", "%%") |
| if opt_to_set is not None: |
| dict_to_update: dict[str, str | tuple[str, str]] = {key: opt_to_set} |
| config_sources.setdefault(section, {}).update(dict_to_update) |
| del config_sources[section][key + "_cmd"] |
| |
| def _include_envs( |
| self, |
| config_sources: ConfigSourcesType, |
| display_sensitive: bool, |
| display_source: bool, |
| raw: bool, |
| ): |
| for env_var in [ |
| os_environment for os_environment in os.environ if os_environment.startswith(ENV_VAR_PREFIX) |
| ]: |
| try: |
| _, section, key = env_var.split("__", 2) |
| opt = self._get_env_var_option(section, key) |
| except ValueError: |
| continue |
| if opt is None: |
| log.warning("Ignoring unknown env var '%s'", env_var) |
| continue |
| if not display_sensitive and env_var != self._env_var_name("core", "unit_test_mode"): |
| # Don't hide cmd/secret values here |
| if not env_var.lower().endswith(("cmd", "secret")): |
| if (section, key) in self.sensitive_config_values: |
| opt = "< hidden >" |
| elif raw: |
| opt = opt.replace("%", "%%") |
| if display_source: |
| opt = (opt, "env var") |
| |
| section = section.lower() |
| # if we lower key for kubernetes_environment_variables section, |
| # then we won't be able to set any Airflow environment |
| # variables. Airflow only parse environment variables starts |
| # with AIRFLOW_. Therefore, we need to make it a special case. |
| if section != "kubernetes_environment_variables": |
| key = key.lower() |
| config_sources.setdefault(section, {}).update({key: opt}) |
| |
| def _filter_by_source( |
| self, |
| config_sources: ConfigSourcesType, |
| display_source: bool, |
| getter_func, |
| ): |
| """ |
| Delete default configs from current configuration. |
| |
| An OrderedDict of OrderedDicts, if it would conflict with special sensitive_config_values. |
| |
| This is necessary because bare configs take precedence over the command |
| or secret key equivalents so if the current running config is |
| materialized with Airflow defaults they in turn override user set |
| command or secret key configs. |
| |
| :param config_sources: The current configuration to operate on |
| :param display_source: If False, configuration options contain raw |
| values. If True, options are a tuple of (option_value, source). |
| Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'. |
| :param getter_func: A callback function that gets the user configured |
| override value for a particular sensitive_config_values config. |
| :return: None, the given config_sources is filtered if necessary, |
| otherwise untouched. |
| """ |
| for section, key in self.sensitive_config_values: |
| # Don't bother if we don't have section / key |
| if section not in config_sources or key not in config_sources[section]: |
| continue |
| # Check that there is something to override defaults |
| try: |
| getter_opt = getter_func(section, key) |
| except ValueError: |
| continue |
| if not getter_opt: |
| continue |
| # Check to see that there is a default value |
| if self.get_default_value(section, key) is None: |
| continue |
| # Check to see if bare setting is the same as defaults |
| if display_source: |
| # when display_source = true, we know that the config_sources contains tuple |
| opt, source = config_sources[section][key] # type: ignore |
| else: |
| opt = config_sources[section][key] |
| if opt == self.get_default_value(section, key): |
| del config_sources[section][key] |
| |
| @staticmethod |
| def _replace_config_with_display_sources( |
| config_sources: ConfigSourcesType, |
| configs: Iterable[tuple[str, ConfigParser]], |
| configuration_description: dict[str, dict[str, Any]], |
| display_source: bool, |
| raw: bool, |
| deprecated_options: dict[tuple[str, str], tuple[str, str, str]], |
| include_env: bool, |
| include_cmds: bool, |
| include_secret: bool, |
| ): |
| for source_name, config in configs: |
| sections = config.sections() |
| for section in sections: |
| AirflowConfigParser._replace_section_config_with_display_sources( |
| config, |
| config_sources, |
| configuration_description, |
| display_source, |
| raw, |
| section, |
| source_name, |
| deprecated_options, |
| configs, |
| include_env=include_env, |
| include_cmds=include_cmds, |
| include_secret=include_secret, |
| ) |
| |
| @staticmethod |
| def _deprecated_value_is_set_in_config( |
| deprecated_section: str, |
| deprecated_key: str, |
| configs: Iterable[tuple[str, ConfigParser]], |
| ) -> bool: |
| for config_type, config in configs: |
| if config_type != "default": |
| with contextlib.suppress(NoSectionError): |
| deprecated_section_array = config.items(section=deprecated_section, raw=True) |
| if any(key == deprecated_key for key, _ in deprecated_section_array): |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def _deprecated_variable_is_set(deprecated_section: str, deprecated_key: str) -> bool: |
| return ( |
| os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}") |
| is not None |
| ) |
| |
| @staticmethod |
| def _deprecated_command_is_set_in_config( |
| deprecated_section: str, |
| deprecated_key: str, |
| configs: Iterable[tuple[str, ConfigParser]], |
| ) -> bool: |
| return AirflowConfigParser._deprecated_value_is_set_in_config( |
| deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_cmd", configs=configs |
| ) |
| |
| @staticmethod |
| def _deprecated_variable_command_is_set(deprecated_section: str, deprecated_key: str) -> bool: |
| return ( |
| os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_CMD") |
| is not None |
| ) |
| |
| @staticmethod |
| def _deprecated_secret_is_set_in_config( |
| deprecated_section: str, |
| deprecated_key: str, |
| configs: Iterable[tuple[str, ConfigParser]], |
| ) -> bool: |
| return AirflowConfigParser._deprecated_value_is_set_in_config( |
| deprecated_section=deprecated_section, deprecated_key=deprecated_key + "_secret", configs=configs |
| ) |
| |
| @staticmethod |
| def _deprecated_variable_secret_is_set(deprecated_section: str, deprecated_key: str) -> bool: |
| return ( |
| os.environ.get(f"{ENV_VAR_PREFIX}{deprecated_section.upper()}__{deprecated_key.upper()}_SECRET") |
| is not None |
| ) |
| |
| @contextmanager |
| def suppress_future_warnings(self): |
| suppress_future_warnings = self._suppress_future_warnings |
| self._suppress_future_warnings = True |
| yield self |
| self._suppress_future_warnings = suppress_future_warnings |
| |
| @staticmethod |
| def _replace_section_config_with_display_sources( |
| config: ConfigParser, |
| config_sources: ConfigSourcesType, |
| configuration_description: dict[str, dict[str, Any]], |
| display_source: bool, |
| raw: bool, |
| section: str, |
| source_name: str, |
| deprecated_options: dict[tuple[str, str], tuple[str, str, str]], |
| configs: Iterable[tuple[str, ConfigParser]], |
| include_env: bool, |
| include_cmds: bool, |
| include_secret: bool, |
| ): |
| sect = config_sources.setdefault(section, {}) |
| if isinstance(config, AirflowConfigParser): |
| with config.suppress_future_warnings(): |
| items: Iterable[tuple[str, Any]] = config.items(section=section, raw=raw) |
| else: |
| items = config.items(section=section, raw=raw) |
| for k, val in items: |
| deprecated_section, deprecated_key, _ = deprecated_options.get((section, k), (None, None, None)) |
| if deprecated_section and deprecated_key: |
| if source_name == "default": |
| # If deprecated entry has some non-default value set for any of the sources requested, |
| # We should NOT set default for the new entry (because it will override anything |
| # coming from the deprecated ones) |
| if AirflowConfigParser._deprecated_value_is_set_in_config( |
| deprecated_section, deprecated_key, configs |
| ): |
| continue |
| if include_env and AirflowConfigParser._deprecated_variable_is_set( |
| deprecated_section, deprecated_key |
| ): |
| continue |
| if include_cmds and ( |
| AirflowConfigParser._deprecated_variable_command_is_set( |
| deprecated_section, deprecated_key |
| ) |
| or AirflowConfigParser._deprecated_command_is_set_in_config( |
| deprecated_section, deprecated_key, configs |
| ) |
| ): |
| continue |
| if include_secret and ( |
| AirflowConfigParser._deprecated_variable_secret_is_set( |
| deprecated_section, deprecated_key |
| ) |
| or AirflowConfigParser._deprecated_secret_is_set_in_config( |
| deprecated_section, deprecated_key, configs |
| ) |
| ): |
| continue |
| if display_source: |
| updated_source_name = source_name |
| if source_name == "default": |
| # defaults can come from other sources (default-<PROVIDER>) that should be used here |
| source_description_section = configuration_description.get(section, {}) |
| source_description_key = source_description_section.get("options", {}).get(k, {}) |
| if source_description_key is not None: |
| updated_source_name = source_description_key.get("source", source_name) |
| sect[k] = (val, updated_source_name) |
| else: |
| sect[k] = val |
| |
| def load_test_config(self): |
| """ |
| Use test configuration rather than the configuration coming from airflow defaults. |
| |
| When running tests we use special the unit_test configuration to avoid accidental modifications and |
| different behaviours when running the tests. Values for those test configuration are stored in |
| the "unit_tests.cfg" configuration file in the ``airflow/config_templates`` folder |
| and you need to change values there if you want to make some specific configuration to be used |
| """ |
| # We need those globals before we run "get_all_expansion_variables" because this is where |
| # the variables are expanded from in the configuration |
| global FERNET_KEY, AIRFLOW_HOME |
| from cryptography.fernet import Fernet |
| |
| unit_test_config_file = pathlib.Path(__file__).parent / "config_templates" / "unit_tests.cfg" |
| unit_test_config = unit_test_config_file.read_text() |
| self.remove_all_read_configurations() |
| with StringIO(unit_test_config) as test_config_file: |
| self.read_file(test_config_file) |
| # set fernet key to a random value |
| global FERNET_KEY |
| FERNET_KEY = Fernet.generate_key().decode() |
| self.expand_all_configuration_values() |
| log.info("Unit test configuration loaded from 'config_unit_tests.cfg'") |
| |
| def expand_all_configuration_values(self): |
| """Expand all configuration values using global and local variables defined in this module.""" |
| all_vars = get_all_expansion_variables() |
| for section in self.sections(): |
| for key, value in self.items(section): |
| if value is not None: |
| if self.has_option(section, key): |
| self.remove_option(section, key) |
| if self.is_template(section, key) or not isinstance(value, str): |
| self.set(section, key, value) |
| else: |
| self.set(section, key, value.format(**all_vars)) |
| |
| def remove_all_read_configurations(self): |
| """Remove all read configurations, leaving only default values in the config.""" |
| for section in self.sections(): |
| self.remove_section(section) |
| |
| @property |
| def providers_configuration_loaded(self) -> bool: |
| """Checks if providers have been loaded.""" |
| return self._providers_configuration_loaded |
| |
| def load_providers_configuration(self): |
| """ |
| Load configuration for providers. |
| |
| This should be done after initial configuration have been performed. Initializing and discovering |
| providers is an expensive operation and cannot be performed when we load configuration for the first |
| time when airflow starts, because we initialize configuration very early, during importing of the |
| `airflow` package and the module is not yet ready to be used when it happens and until configuration |
| and settings are loaded. Therefore, in order to reload provider configuration we need to additionally |
| load provider - specific configuration. |
| """ |
| log.debug("Loading providers configuration") |
| from airflow.providers_manager import ProvidersManager |
| |
| self.restore_core_default_configuration() |
| for provider, config in ProvidersManager().already_initialized_provider_configs: |
| for provider_section, provider_section_content in config.items(): |
| provider_options = provider_section_content["options"] |
| section_in_current_config = self.configuration_description.get(provider_section) |
| if not section_in_current_config: |
| self.configuration_description[provider_section] = deepcopy(provider_section_content) |
| section_in_current_config = self.configuration_description.get(provider_section) |
| section_in_current_config["source"] = f"default-{provider}" |
| for option in provider_options: |
| section_in_current_config["options"][option]["source"] = f"default-{provider}" |
| else: |
| section_source = section_in_current_config.get("source", "Airflow's core package").split( |
| "default-" |
| )[-1] |
| raise AirflowConfigException( |
| f"The provider {provider} is attempting to contribute " |
| f"configuration section {provider_section} that " |
| f"has already been added before. The source of it: {section_source}. " |
| "This is forbidden. A provider can only add new sections. It " |
| "cannot contribute options to existing sections or override other " |
| "provider's configuration.", |
| UserWarning, |
| ) |
| self._default_values = create_default_config_parser(self.configuration_description) |
| # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete |
| # the cached values, and it will be refreshed on next access. This has been an implementation |
| # detail in Python 3.8 but as of Python 3.9 it is documented behaviour. |
| # See https://docs.python.org/3/library/functools.html#functools.cached_property |
| try: |
| del self.sensitive_config_values |
| except AttributeError: |
| # no problem if cache is not set yet |
| pass |
| self._providers_configuration_loaded = True |
| |
| @staticmethod |
| def _warn_deprecate( |
| section: str, key: str, deprecated_section: str, deprecated_name: str, extra_stacklevel: int |
| ): |
| if section == deprecated_section: |
| warnings.warn( |
| f"The {deprecated_name} option in [{section}] has been renamed to {key} - " |
| f"the old setting has been used, but please update your config.", |
| DeprecationWarning, |
| stacklevel=4 + extra_stacklevel, |
| ) |
| else: |
| warnings.warn( |
| f"The {deprecated_name} option in [{deprecated_section}] has been moved to the {key} option " |
| f"in [{section}] - the old setting has been used, but please update your config.", |
| DeprecationWarning, |
| stacklevel=4 + extra_stacklevel, |
| ) |
| |
| def __getstate__(self) -> dict[str, Any]: |
| """Return the state of the object as a dictionary for pickling.""" |
| return { |
| name: getattr(self, name) |
| for name in [ |
| "_sections", |
| "is_validated", |
| "configuration_description", |
| "upgraded_values", |
| "_default_values", |
| ] |
| } |
| |
| def __setstate__(self, state) -> None: |
| """Restore the state of the object from a dictionary representation.""" |
| self.__init__() # type: ignore[misc] |
| config = state.pop("_sections") |
| self.read_dict(config) |
| self.__dict__.update(state) |
| |
| |
| def get_airflow_home() -> str: |
| """Get path to Airflow Home.""" |
| return expand_env_var(os.environ.get("AIRFLOW_HOME", "~/airflow")) |
| |
| |
| def get_airflow_config(airflow_home: str) -> str: |
| """Get Path to airflow.cfg path.""" |
| airflow_config_var = os.environ.get("AIRFLOW_CONFIG") |
| if airflow_config_var is None: |
| return os.path.join(airflow_home, "airflow.cfg") |
| return expand_env_var(airflow_config_var) |
| |
| |
| def get_all_expansion_variables() -> dict[str, Any]: |
| return {k: v for d in [globals(), locals()] for k, v in d.items()} |
| |
| |
| def _generate_fernet_key() -> str: |
| from cryptography.fernet import Fernet |
| |
| return Fernet.generate_key().decode() |
| |
| |
| def create_default_config_parser(configuration_description: dict[str, dict[str, Any]]) -> ConfigParser: |
| """ |
| Create default config parser based on configuration description. |
| |
| It creates ConfigParser with all default values retrieved from the configuration description and |
| expands all the variables from the global and local variables defined in this module. |
| |
| :param configuration_description: configuration description - retrieved from config.yaml files |
| following the schema defined in "config.yml.schema.json" in the config_templates folder. |
| :return: Default Config Parser that can be used to read configuration values from. |
| """ |
| parser = ConfigParser() |
| all_vars = get_all_expansion_variables() |
| for section, section_desc in configuration_description.items(): |
| parser.add_section(section) |
| options = section_desc["options"] |
| for key in options: |
| default_value = options[key]["default"] |
| is_template = options[key].get("is_template", False) |
| if default_value is not None: |
| if is_template or not isinstance(default_value, str): |
| parser.set(section, key, default_value) |
| else: |
| parser.set(section, key, default_value.format(**all_vars)) |
| return parser |
| |
| |
| def create_pre_2_7_defaults() -> ConfigParser: |
| """ |
| Create parser using the old defaults from Airflow < 2.7.0. |
| |
| This is used in order to be able to fall-back to those defaults when old version of provider, |
| not supporting "config contribution" is installed with Airflow 2.7.0+. This "default" |
| configuration does not support variable expansion, those are pretty much hard-coded defaults ' |
| we want to fall-back to in such case. |
| """ |
| config_parser = ConfigParser() |
| config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg")) |
| return config_parser |
| |
| |
| def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: |
| airflow_config = pathlib.Path(AIRFLOW_CONFIG) |
| if airflow_config.is_dir(): |
| msg = ( |
| "Airflow config expected to be a path to the configuration file, " |
| f"but got a directory {airflow_config.__fspath__()!r}." |
| ) |
| raise IsADirectoryError(msg) |
| elif not airflow_config.exists(): |
| log.debug("Creating new Airflow config file in: %s", airflow_config.__fspath__()) |
| config_directory = airflow_config.parent |
| if not config_directory.exists(): |
| # Compatibility with Python 3.8, ``PurePath.is_relative_to`` was added in Python 3.9 |
| try: |
| config_directory.relative_to(AIRFLOW_HOME) |
| except ValueError: |
| msg = ( |
| f"Config directory {config_directory.__fspath__()!r} not exists " |
| f"and it is not relative to AIRFLOW_HOME {AIRFLOW_HOME!r}. " |
| "Please create this directory first." |
| ) |
| raise FileNotFoundError(msg) from None |
| log.debug("Create directory %r for Airflow config", config_directory.__fspath__()) |
| config_directory.mkdir(parents=True, exist_ok=True) |
| if conf.get("core", "fernet_key", fallback=None) is None: |
| # We know that FERNET_KEY is not set, so we can generate it, set as global key |
| # and also write it to the config file so that same key will be used next time |
| global FERNET_KEY |
| FERNET_KEY = _generate_fernet_key() |
| conf.remove_option("core", "fernet_key") |
| conf.set("core", "fernet_key", FERNET_KEY) |
| pathlib.Path(airflow_config.__fspath__()).touch() |
| make_group_other_inaccessible(airflow_config.__fspath__()) |
| with open(airflow_config, "w") as file: |
| conf.write( |
| file, |
| include_sources=False, |
| include_env_vars=True, |
| include_providers=True, |
| extra_spacing=True, |
| only_defaults=True, |
| ) |
| return conf |
| |
| |
| def load_standard_airflow_configuration(airflow_config_parser: AirflowConfigParser): |
| """ |
| Load standard airflow configuration. |
| |
| In case it finds that the configuration file is missing, it will create it and write the default |
| configuration values there, based on defaults passed, and will add the comments and examples |
| from the default configuration. |
| |
| :param airflow_config_parser: parser to which the configuration will be loaded |
| |
| """ |
| global AIRFLOW_HOME |
| log.info("Reading the config from %s", AIRFLOW_CONFIG) |
| airflow_config_parser.read(AIRFLOW_CONFIG) |
| if airflow_config_parser.has_option("core", "AIRFLOW_HOME"): |
| msg = ( |
| "Specifying both AIRFLOW_HOME environment variable and airflow_home " |
| "in the config file is deprecated. Please use only the AIRFLOW_HOME " |
| "environment variable and remove the config file entry." |
| ) |
| if "AIRFLOW_HOME" in os.environ: |
| warnings.warn(msg, category=DeprecationWarning, stacklevel=1) |
| elif airflow_config_parser.get("core", "airflow_home") == AIRFLOW_HOME: |
| warnings.warn( |
| "Specifying airflow_home in the config file is deprecated. As you " |
| "have left it at the default value you should remove the setting " |
| "from your airflow.cfg and suffer no change in behaviour.", |
| category=DeprecationWarning, |
| stacklevel=1, |
| ) |
| else: |
| # there |
| AIRFLOW_HOME = airflow_config_parser.get("core", "airflow_home") # type: ignore[assignment] |
| warnings.warn(msg, category=DeprecationWarning, stacklevel=1) |
| |
| |
| def initialize_config() -> AirflowConfigParser: |
| """ |
| Load the Airflow config files. |
| |
| Called for you automatically as part of the Airflow boot process. |
| """ |
| airflow_config_parser = AirflowConfigParser() |
| if airflow_config_parser.getboolean("core", "unit_test_mode"): |
| airflow_config_parser.load_test_config() |
| else: |
| load_standard_airflow_configuration(airflow_config_parser) |
| # If the user set unit_test_mode in the airflow.cfg, we still |
| # want to respect that and then load the default unit test configuration |
| # file on top of it. |
| if airflow_config_parser.getboolean("core", "unit_test_mode"): |
| airflow_config_parser.load_test_config() |
| # Set the WEBSERVER_CONFIG variable |
| global WEBSERVER_CONFIG |
| WEBSERVER_CONFIG = airflow_config_parser.get("webserver", "config_file") |
| return airflow_config_parser |
| |
| |
| @providers_configuration_loaded |
| def write_webserver_configuration_if_needed(airflow_config_parser: AirflowConfigParser): |
| webserver_config = airflow_config_parser.get("webserver", "config_file") |
| if not os.path.isfile(webserver_config): |
| import shutil |
| |
| pathlib.Path(webserver_config).parent.mkdir(parents=True, exist_ok=True) |
| log.info("Creating new FAB webserver config file in: %s", webserver_config) |
| shutil.copy(_default_config_file_path("default_webserver_config.py"), webserver_config) |
| |
| |
| def make_group_other_inaccessible(file_path: str): |
| try: |
| permissions = os.stat(file_path) |
| os.chmod(file_path, permissions.st_mode & (stat.S_IRUSR | stat.S_IWUSR)) |
| except Exception as e: |
| log.warning( |
| "Could not change permissions of config file to be group/other inaccessible. " |
| "Continuing with original permissions: %s", |
| e, |
| ) |
| |
| |
| def get(*args, **kwargs) -> ConfigType | None: |
| """Historical get.""" |
| warnings.warn( |
| "Accessing configuration method 'get' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.get'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.get(*args, **kwargs) |
| |
| |
| def getboolean(*args, **kwargs) -> bool: |
| """Historical getboolean.""" |
| warnings.warn( |
| "Accessing configuration method 'getboolean' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.getboolean'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.getboolean(*args, **kwargs) |
| |
| |
| def getfloat(*args, **kwargs) -> float: |
| """Historical getfloat.""" |
| warnings.warn( |
| "Accessing configuration method 'getfloat' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.getfloat'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.getfloat(*args, **kwargs) |
| |
| |
| def getint(*args, **kwargs) -> int: |
| """Historical getint.""" |
| warnings.warn( |
| "Accessing configuration method 'getint' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.getint'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.getint(*args, **kwargs) |
| |
| |
| def getsection(*args, **kwargs) -> ConfigOptionsDictType | None: |
| """Historical getsection.""" |
| warnings.warn( |
| "Accessing configuration method 'getsection' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.getsection'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.getsection(*args, **kwargs) |
| |
| |
| def has_option(*args, **kwargs) -> bool: |
| """Historical has_option.""" |
| warnings.warn( |
| "Accessing configuration method 'has_option' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.has_option'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.has_option(*args, **kwargs) |
| |
| |
| def remove_option(*args, **kwargs) -> bool: |
| """Historical remove_option.""" |
| warnings.warn( |
| "Accessing configuration method 'remove_option' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.remove_option'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.remove_option(*args, **kwargs) |
| |
| |
| def as_dict(*args, **kwargs) -> ConfigSourcesType: |
| """Historical as_dict.""" |
| warnings.warn( |
| "Accessing configuration method 'as_dict' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.as_dict'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return conf.as_dict(*args, **kwargs) |
| |
| |
| def set(*args, **kwargs) -> None: |
| """Historical set.""" |
| warnings.warn( |
| "Accessing configuration method 'set' directly from the configuration module is " |
| "deprecated. Please access the configuration from the 'configuration.conf' object via " |
| "'conf.set'", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| conf.set(*args, **kwargs) |
| |
| |
| def ensure_secrets_loaded() -> list[BaseSecretsBackend]: |
| """ |
| Ensure that all secrets backends are loaded. |
| |
| If the secrets_backend_list contains only 2 default backends, reload it. |
| """ |
| # Check if the secrets_backend_list contains only 2 default backends |
| if len(secrets_backend_list) == 2: |
| return initialize_secrets_backends() |
| return secrets_backend_list |
| |
| |
| def get_custom_secret_backend() -> BaseSecretsBackend | None: |
| """Get Secret Backend if defined in airflow.cfg.""" |
| secrets_backend_cls = conf.getimport(section="secrets", key="backend") |
| |
| if not secrets_backend_cls: |
| return None |
| |
| try: |
| backend_kwargs = conf.getjson(section="secrets", key="backend_kwargs") |
| if not backend_kwargs: |
| backend_kwargs = {} |
| elif not isinstance(backend_kwargs, dict): |
| raise ValueError("not a dict") |
| except AirflowConfigException: |
| log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.") |
| backend_kwargs = {} |
| except ValueError: |
| log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.") |
| backend_kwargs = {} |
| |
| return secrets_backend_cls(**backend_kwargs) |
| |
| |
| def initialize_secrets_backends() -> list[BaseSecretsBackend]: |
| """ |
| Initialize secrets backend. |
| |
| * import secrets backend classes |
| * instantiate them and return them in a list |
| """ |
| backend_list = [] |
| |
| custom_secret_backend = get_custom_secret_backend() |
| |
| if custom_secret_backend is not None: |
| backend_list.append(custom_secret_backend) |
| |
| for class_name in DEFAULT_SECRETS_SEARCH_PATH: |
| secrets_backend_cls = import_string(class_name) |
| backend_list.append(secrets_backend_cls()) |
| |
| return backend_list |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def _DEFAULT_CONFIG() -> str: |
| path = _default_config_file_path("default_airflow.cfg") |
| with open(path) as fh: |
| return fh.read() |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def _TEST_CONFIG() -> str: |
| path = _default_config_file_path("default_test.cfg") |
| with open(path) as fh: |
| return fh.read() |
| |
| |
| _deprecated = { |
| "DEFAULT_CONFIG": _DEFAULT_CONFIG, |
| "TEST_CONFIG": _TEST_CONFIG, |
| "TEST_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_test.cfg"), |
| "DEFAULT_CONFIG_FILE_PATH": functools.partial(_default_config_file_path, "default_airflow.cfg"), |
| } |
| |
| |
| def __getattr__(name): |
| if name in _deprecated: |
| warnings.warn( |
| f"{__name__}.{name} is deprecated and will be removed in future", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return _deprecated[name]() |
| raise AttributeError(f"module {__name__} has no attribute {name}") |
| |
| |
| def initialize_auth_manager() -> BaseAuthManager: |
| """ |
| Initialize auth manager. |
| |
| * import user manager class |
| * instantiate it and return it |
| """ |
| auth_manager_cls = conf.getimport(section="core", key="auth_manager") |
| |
| if not auth_manager_cls: |
| raise AirflowConfigException( |
| "No auth manager defined in the config. " |
| "Please specify one using section/key [core/auth_manager]." |
| ) |
| |
| return auth_manager_cls() |
| |
| |
| # Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using |
| # "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults. |
| AIRFLOW_HOME = get_airflow_home() |
| AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME) |
| |
| # Set up dags folder for unit tests |
| # this directory won't exist if users install via pip |
| _TEST_DAGS_FOLDER = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "dags" |
| ) |
| if os.path.exists(_TEST_DAGS_FOLDER): |
| TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER |
| else: |
| TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, "dags") |
| |
| # Set up plugins folder for unit tests |
| _TEST_PLUGINS_FOLDER = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "tests", "plugins" |
| ) |
| if os.path.exists(_TEST_PLUGINS_FOLDER): |
| TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER |
| else: |
| TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, "plugins") |
| |
| SECRET_KEY = b64encode(os.urandom(16)).decode("utf-8") |
| FERNET_KEY = "" # Set only if needed when generating a new file |
| WEBSERVER_CONFIG = "" # Set by initialize_config |
| |
| conf: AirflowConfigParser = initialize_config() |
| secrets_backend_list = initialize_secrets_backends() |
| conf.validate() |