blob: a624f9fced3934032fb752247a1b62cab5d228e6 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import atexit
import functools
import json
import logging
import os
import sys
import traceback
import warnings
from importlib import metadata
from typing import TYPE_CHECKING, Any, Callable
import pluggy
from packaging.version import Version
from sqlalchemy import create_engine, exc, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool
from airflow import __version__ as airflow_version, policies
from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # noqa: F401
from airflow.exceptions import AirflowInternalRuntimeError, RemovedInAirflow3Warning
from airflow.executors import executor_constants
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.sqlalchemy import is_sqlalchemy_v1
from airflow.utils.state import State
from airflow.utils.timezone import local_timezone, parse_timezone, utc
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session as SASession
from airflow.www.utils import UIAlert
log = logging.getLogger(__name__)
try:
if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
TIMEZONE = parse_timezone(tz)
else:
TIMEZONE = local_timezone()
except Exception:
TIMEZONE = utc
log.info("Configured default timezone %s", TIMEZONE)
HEADER = "\n".join(
[
r" ____________ _____________",
r" ____ |__( )_________ __/__ /________ __",
r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
]
)
LOGGING_LEVEL = logging.INFO
# the prefix to append to gunicorn worker processes after init
GUNICORN_WORKER_READY_PREFIX = "[ready] "
LOG_FORMAT = conf.get("logging", "log_format")
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
SQL_ALCHEMY_CONN: str | None = None
PLUGINS_FOLDER: str | None = None
LOGGING_CLASS_PATH: str | None = None
DONOT_MODIFY_HANDLERS: bool | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
engine: Engine
Session: Callable[..., SASession]
# The JSON library to use for DAG Serialization and De-Serialization
json = json
# Dictionary containing State and colors associated to each state to
# display on the Webserver
STATE_COLORS = {
"deferred": "mediumpurple",
"failed": "red",
"queued": "gray",
"removed": "lightgrey",
"restarting": "violet",
"running": "lime",
"scheduled": "tan",
"skipped": "hotpink",
"success": "green",
"up_for_reschedule": "turquoise",
"up_for_retry": "gold",
"upstream_failed": "orange",
"shutdown": "blue",
}
@functools.lru_cache(maxsize=None)
def _get_rich_console(file):
# Delay imports until we need it
import rich.console
return rich.console.Console(file=file)
def custom_show_warning(message, category, filename, lineno, file=None, line=None):
"""Print rich and visible warnings."""
# Delay imports until we need it
from rich.markup import escape
msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]"
write_console = _get_rich_console(file or sys.stderr)
write_console.print(msg, soft_wrap=True)
def replace_showwarning(replacement):
"""Replace ``warnings.showwarning``, returning the original.
This is useful since we want to "reset" the ``showwarning`` hook on exit to
avoid lazy-loading issues. If a warning is emitted after Python cleaned up
the import system, we would no longer be able to import ``rich``.
"""
original = warnings.showwarning
warnings.showwarning = replacement
return original
original_show_warning = replace_showwarning(custom_show_warning)
atexit.register(functools.partial(replace_showwarning, original_show_warning))
POLICY_PLUGIN_MANAGER: Any = None # type: ignore
def task_policy(task):
return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task)
def dag_policy(dag):
return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag)
def task_instance_mutation_hook(task_instance):
return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance)
task_instance_mutation_hook.is_noop = True # type: ignore
def pod_mutation_hook(pod):
return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod)
def get_airflow_context_vars(context):
return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context)
def get_dagbag_import_timeout(dag_file_path: str):
return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
def configure_policy_plugin_manager():
global POLICY_PLUGIN_MANAGER
POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
def load_policy_plugins(pm: pluggy.PluginManager):
# We can't log duration etc here, as logging hasn't yet been configured!
pm.load_setuptools_entrypoints("airflow.policy")
def configure_vars():
"""Configure Global Variables from airflow.cfg."""
global SQL_ALCHEMY_CONN
global DAGS_FOLDER
global PLUGINS_FOLDER
global DONOT_MODIFY_HANDLERS
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
# If the flag is set to False, we remove all handlers from the root logger
# and add all handlers from 'airflow.task' logger to the root Logger. This is done
# to get all the logs from the print & log statements in the DAG files before a task is run
# The handlers are restored after the task completes execution.
DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
def _run_openlineage_runtime_check():
"""
Ensure compatibility of OpenLineage provider package and Airflow version.
Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
provider incompatible with future Airflow versions (>= 2.10.0).
"""
ol_package = "apache-airflow-providers-openlineage"
try:
ol_version = metadata.version(ol_package)
except metadata.PackageNotFoundError:
return
if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
raise RuntimeError(
f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
f"`apache-airflow` == `{airflow_version}`. "
f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
)
def run_providers_custom_runtime_checks():
_run_openlineage_runtime_check()
class SkipDBTestsSession:
"""
This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
:meta private:
"""
def __init__(self):
raise AirflowInternalRuntimeError(
"Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
"Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
"See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
"best-practices-for-db-tests on how "
"to deal with it and consult examples."
)
def remove(*args, **kwargs):
pass
class TracebackSession:
"""
Session that throws error when you try to use it.
Also stores stack at instantiation call site.
:meta private:
"""
def __init__(self):
self.traceback = traceback.extract_stack()
def __getattr__(self, item):
raise RuntimeError(
"TracebackSession object was used but internal API is enabled. "
"You'll need to ensure you are making only RPC calls with this object. "
"The stack list below will show where the TracebackSession object was created."
+ "\n".join(traceback.format_list(self.traceback))
)
def remove(*args, **kwargs):
pass
def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy."""
from airflow.utils.log.secrets_masker import mask_secret
if (
SQL_ALCHEMY_CONN
and SQL_ALCHEMY_CONN.startswith("sqlite")
and not SQL_ALCHEMY_CONN.startswith("sqlite:////")
# In memory is not useful for production, but useful for writing tests against Airflow for extensions
and SQL_ALCHEMY_CONN != "sqlite://"
):
from airflow.exceptions import AirflowConfigException
raise AirflowConfigException(
f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)
global Session
global engine
from airflow.api_internal.internal_api_call import InternalApiConfig
if InternalApiConfig.get_use_internal_api():
Session = TracebackSession
engine = None
return
elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
engine = None
return
log.debug("Setting up DB connection pool (PID %s)", os.getpid())
engine_args = prepare_engine_args(disable_connection_pool, pool_class)
if conf.has_option("database", "sql_alchemy_connect_args"):
connect_args = conf.getimport("database", "sql_alchemy_connect_args")
else:
connect_args = {}
engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)
mask_secret(engine.url.password)
setup_event_handlers(engine)
Session = scoped_session(
sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False,
)
)
DEFAULT_ENGINE_ARGS = {
"postgresql": {
"executemany_mode": "values_plus_batch",
"executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
"executemany_batch_page_size": 2000,
},
}
def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args."""
default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
if SQL_ALCHEMY_CONN.startswith(dialect):
default_args = default.copy()
break
engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) # type: ignore
if pool_class:
# Don't use separate settings for size etc, only those from sql_alchemy_engine_args
engine_args["poolclass"] = pool_class
elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
# Pool size engine args not supported by sqlite.
# If no config value is defined for the pool size, select a reasonable value.
# 0 means no limit, which could lead to exceeding the Database connection limit.
pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,
# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool, they are disconnected and discarded.
# It follows then that the total number of simultaneous connections
# the pool will allow is pool_size + max_overflow,
# and the total number of "sleeping" connections the pool will allow is pool_size.
# max_overflow can be set to -1 to indicate no overflow limit;
# no limit will be placed on the total number
# of concurrent connections. Defaults to 10.
max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
# The DB server already has a value for wait_timeout (number of seconds after
# which an idle sleeping connection should be killed). Since other DBs may
# co-exist on the same server, SQLAlchemy should set its
# pool_recycle to an equal or smaller value.
pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
# Check connection at the start of each connection pool checkout.
# Typically, this is a simple statement like "SELECT 1", but may also make use
# of some DBAPI-specific method to test the connection for liveness.
# More information here:
# https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
log.debug(
"settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
"pool_recycle=%d, pid=%d",
pool_size,
max_overflow,
pool_recycle,
os.getpid(),
)
engine_args["pool_size"] = pool_size
engine_args["pool_recycle"] = pool_recycle
engine_args["pool_pre_ping"] = pool_pre_ping
engine_args["max_overflow"] = max_overflow
# The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
# running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
# 'READ COMMITTED' is the default value for PostgreSQL.
# More information here:
# https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"
if SQL_ALCHEMY_CONN.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"
if is_sqlalchemy_v1():
# Allow the user to specify an encoding for their DB otherwise default
# to utf-8 so jobs & users with non-latin1 characters can still use us.
# This parameter was removed in SQLAlchemy 2.x.
engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
return engine_args
def dispose_orm():
"""Properly close pooled database connections."""
log.debug("Disposing DB connection pool (PID %s)", os.getpid())
global engine
global Session
if Session is not None: # type: ignore[truthy-function]
Session.remove()
Session = None
if engine:
engine.dispose()
engine = None
def reconfigure_orm(disable_connection_pool=False, pool_class=None):
"""Properly close database connections and re-configure ORM."""
dispose_orm()
configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
def configure_adapters():
"""Register Adapters and DB Converters."""
from pendulum import DateTime as Pendulum
if SQL_ALCHEMY_CONN.startswith("sqlite"):
from sqlite3 import register_adapter
register_adapter(Pendulum, lambda val: val.isoformat(" "))
if SQL_ALCHEMY_CONN.startswith("mysql"):
try:
import MySQLdb.converters
MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
except ImportError:
pass
try:
import pymysql.converters
pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
except ImportError:
pass
def validate_session():
"""Validate ORM Session."""
global engine
worker_precheck = conf.getboolean("celery", "worker_precheck")
if not worker_precheck:
return True
else:
check_session = sessionmaker(bind=engine)
session = check_session()
try:
session.execute(text("select 1"))
conn_status = True
except exc.DBAPIError as err:
log.error(err)
conn_status = False
session.close()
return conn_status
def configure_action_logging() -> None:
"""Any additional configuration (register callback) for airflow.utils.action_loggers module."""
def prepare_syspath():
"""Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)
# Add ./config/ for loading custom log parsers etc, or
# airflow_local_settings etc.
config_path = os.path.join(AIRFLOW_HOME, "config")
if config_path not in sys.path:
sys.path.append(config_path)
if PLUGINS_FOLDER not in sys.path:
sys.path.append(PLUGINS_FOLDER)
def get_session_lifetime_config():
"""Get session timeout configs and handle outdated configs gracefully."""
session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
"webserver", "force_log_out_after", fallback=None
)
minutes_per_day = 24 * 60
default_lifetime_minutes = "43200"
if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
warnings.warn(
"`session_lifetime_days` option from `[webserver]` section has been "
"renamed to `session_lifetime_minutes`. The new option allows to configure "
"session lifetime in minutes. The `force_log_out_after` option has been removed "
"from `[webserver]` section. Please update your configuration.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if session_lifetime_days:
session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)
if not session_lifetime_minutes:
session_lifetime_days = 30
session_lifetime_minutes = minutes_per_day * session_lifetime_days
log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
return int(session_lifetime_minutes)
def import_local_settings():
"""Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
try:
import airflow_local_settings
except ModuleNotFoundError as e:
if e.name == "airflow_local_settings":
log.debug("No airflow_local_settings to import.", exc_info=True)
else:
log.critical(
"Failed to import airflow_local_settings due to a transitive module not found error.",
exc_info=True,
)
raise
except ImportError:
log.critical("Failed to import airflow_local_settings.", exc_info=True)
raise
else:
if hasattr(airflow_local_settings, "__all__"):
names = set(airflow_local_settings.__all__)
else:
names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
if "policy" in names and "task_policy" not in names:
warnings.warn(
"Using `policy` in airflow_local_settings.py is deprecated. "
"Please rename your `policy` to `task_policy`.",
RemovedInAirflow3Warning,
stacklevel=2,
)
setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy)
names.remove("policy")
plugin_functions = policies.make_plugin_from_local_settings(
POLICY_PLUGIN_MANAGER, airflow_local_settings, names
)
# If we have already handled a function by adding it to the plugin,
# then don't clobber the global function
for name in names - plugin_functions:
globals()[name] = getattr(airflow_local_settings, name)
if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls():
task_instance_mutation_hook.is_noop = False
log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
def initialize():
"""Initialize Airflow with all the settings from this file."""
configure_vars()
prepare_syspath()
configure_policy_plugin_manager()
# Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
# in airflow_local_settings to take precendec
load_policy_plugins(POLICY_PLUGIN_MANAGER)
import_local_settings()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()
State.state_color.update(STATE_COLORS)
configure_adapters()
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()
# Run any custom runtime checks that needs to be executed for providers
run_providers_custom_runtime_checks()
# Ensure we close DB connections at scheduler and gunicorn worker terminations
atexit.register(dispose_orm)
def is_usage_data_collection_enabled() -> bool:
"""Check if data collection is enabled."""
return conf.getboolean("usage_data_collection", "enabled", fallback=True) and (
os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
)
# Const stuff
KILOBYTE = 1024
MEGABYTE = KILOBYTE * KILOBYTE
WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
# Updating serialized DAG can not be faster than a minimum interval to reduce database
# write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
# If set to True, serialized DAGs is compressed before writing to DB,
COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
# Fetching serialized DAG can not be faster than a minimum interval to reduce database
# read rate. This config controls when your DAGs are updated in the Webserver
MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
CAN_FORK = hasattr(os, "fork")
EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)
ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)
# Whether or not to check each dagrun against defined SLAs
CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
# loaded from module.
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
# Determines if the executor utilizes Kubernetes
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
executor_constants.KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
# Executors can set this to true to configure logging correctly for
# containerized executors.
IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""
HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
# By default this is off, but is automatically configured on when running task
# instances
MASK_SECRETS_IN_LOGS = False
# Display alerts on the dashboard
# Useful for warning about setup issues or announcing changes to end users
# List of UIAlerts, which allows for specifying the message, category, and roles the
# message should be shown to. For example:
# from airflow.www.utils import UIAlert
#
# DASHBOARD_UIALERTS = [
# UIAlert("Welcome to Airflow"), # All users
# UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role
# # A flash message with html:
# UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
# ]
#
DASHBOARD_UIALERTS: list[UIAlert] = []
# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
"true",
"t",
"yes",
"y",
"1",
}