blob: b59fea7893421797d4337b2f7ca4bcea936c0e2f [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Explicit configuration and definition of Airflow CLI commands."""
from __future__ import annotations
import argparse
import json
import os
import textwrap
from argparse import ArgumentError
from typing import Callable, Iterable, NamedTuple, Union
import lazy_object_proxy
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.state import DagRunState
from airflow.utils.timezone import parse as parsedate
BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
def lazy_load_command(import_path: str) -> Callable:
"""Create a lazy loader for command."""
_, _, name = import_path.rpartition(".")
def command(*args, **kwargs):
func = import_string(import_path)
return func(*args, **kwargs)
command.__name__ = name
return command
class DefaultHelpParser(argparse.ArgumentParser):
"""CustomParser to display help message."""
def _check_value(self, action, value):
"""Override _check_value and check conditionally added command."""
if action.dest == "subcommand" and value == "celery":
executor = conf.get("core", "EXECUTOR")
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
classes = ()
try:
from airflow.executors.celery_executor import CeleryExecutor
classes += (CeleryExecutor,)
except ImportError:
message = (
"The celery subcommand requires that you pip install the celery module. "
"To do it, run: pip install 'apache-airflow[celery]'"
)
raise ArgumentError(action, message)
try:
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
classes += (CeleryKubernetesExecutor,)
except ImportError:
pass
if not issubclass(executor_cls, classes):
message = (
f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and "
f"executors derived from them, your current executor: {executor}, subclassed from: "
f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
)
raise ArgumentError(action, message)
if action.dest == "subcommand" and value == "kubernetes":
try:
import kubernetes.client # noqa: F401
except ImportError:
message = (
"The kubernetes subcommand requires that you pip install the kubernetes python client. "
"To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
)
raise ArgumentError(action, message)
if action.choices is not None and value not in action.choices:
check_legacy_command(action, value)
super()._check_value(action, value)
def error(self, message):
"""Override error and use print_instead of print_usage."""
self.print_help()
self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n")
# Used in Arg to enable `None' as a distinct value from "not passed"
_UNSET = object()
class Arg:
"""Class to keep information about command line argument."""
def __init__(
self,
flags=_UNSET,
help=_UNSET,
action=_UNSET,
default=_UNSET,
nargs=_UNSET,
type=_UNSET,
choices=_UNSET,
required=_UNSET,
metavar=_UNSET,
dest=_UNSET,
):
self.flags = flags
self.kwargs = {}
for k, v in locals().items():
if v is _UNSET:
continue
if k in ("self", "flags"):
continue
self.kwargs[k] = v
def add_to_parser(self, parser: argparse.ArgumentParser):
"""Add this argument to an ArgumentParser."""
if "metavar" in self.kwargs and "type" not in self.kwargs:
if self.kwargs["metavar"] == "DIRPATH":
type = lambda x: self._is_valid_directory(parser, x)
self.kwargs["type"] = type
parser.add_argument(*self.flags, **self.kwargs)
def _is_valid_directory(self, parser, arg):
if not os.path.isdir(arg):
parser.error(f"The directory '{arg}' does not exist!")
return arg
def positive_int(*, allow_zero):
"""Define a positive int type for an argument."""
def _check(value):
try:
value = int(value)
if allow_zero and value == 0:
return value
if value > 0:
return value
except ValueError:
pass
raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
return _check
def string_list_type(val):
"""Parses comma-separated list and returns list of string (strips whitespace)."""
return [x.strip() for x in val.split(",")]
def string_lower_type(val):
"""Lowers arg."""
if not val:
return
return val.strip().lower()
# Shared
ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
ARG_EXECUTION_DATE_OPTIONAL = Arg(
("execution_date",), nargs="?", help="The execution date of the DAG (optional)", type=parsedate
)
ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
("execution_date_or_run_id",), help="The execution_date of the DAG or run_id of the DAGRun"
)
ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL = Arg(
("execution_date_or_run_id",),
nargs="?",
help="The execution_date of the DAG or run_id of the DAGRun (optional)",
)
ARG_TASK_REGEX = Arg(
("-t", "--task-regex"), help="The regex to filter specific task_ids to backfill (optional)"
)
ARG_SUBDIR = Arg(
("-S", "--subdir"),
help=(
"File location or directory from which to look for the dag. "
"Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
"value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
),
default="[AIRFLOW_HOME]/dags" if BUILD_DOCS else settings.DAGS_FOLDER,
)
ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate)
ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate)
ARG_OUTPUT_PATH = Arg(
(
"-o",
"--output-path",
),
help="The output for generated yaml files",
type=str,
default="[CWD]" if BUILD_DOCS else os.getcwd(),
)
ARG_DRY_RUN = Arg(
("-n", "--dry-run"),
help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
action="store_true",
)
ARG_PID = Arg(("--pid",), help="PID file location", nargs="?")
ARG_DAEMON = Arg(
("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
)
ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
ARG_YES = Arg(
("-y", "--yes"),
help="Do not prompt to confirm. Use with care!",
action="store_true",
default=False,
)
ARG_OUTPUT = Arg(
(
"-o",
"--output",
),
help="Output format. Allowed values: json, yaml, plain, table (default: table)",
metavar="(table, json, yaml, plain)",
choices=("table", "json", "yaml", "plain"),
default="table",
)
ARG_COLOR = Arg(
("--color",),
help="Do emit colored output (default: auto)",
choices={ColorMode.ON, ColorMode.OFF, ColorMode.AUTO},
default=ColorMode.AUTO,
)
# DB args
ARG_VERSION_RANGE = Arg(
("-r", "--range"),
help="Version range(start:end) for offline sql generation. Example: '2.0.2:2.2.3'",
default=None,
)
ARG_REVISION_RANGE = Arg(
("--revision-range",),
help=(
"Migration revision range(start:end) to use for offline sql generation. "
"Example: ``a13f7613ad25:7b2661a43ba3``"
),
default=None,
)
# list_dag_runs
ARG_DAG_ID_REQ_FLAG = Arg(
("-d", "--dag-id"), required=True, help="The id of the dag"
) # TODO: convert this to a positional arg in Airflow 3
ARG_NO_BACKFILL = Arg(
("--no-backfill",), help="filter all the backfill dagruns given the dag id", action="store_true"
)
dagrun_states = tuple(state.value for state in DagRunState)
ARG_STATE = Arg(
("--state",),
help="Only list the dag runs corresponding to the state",
metavar=", ".join(dagrun_states),
choices=dagrun_states,
)
# list_jobs
ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
ARG_LIMIT = Arg(("--limit",), help="Return a limited number of records")
# next_execution
ARG_NUM_EXECUTIONS = Arg(
("-n", "--num-executions"),
default=1,
type=positive_int(allow_zero=False),
help="The number of next execution datetimes to show",
)
# backfill
ARG_MARK_SUCCESS = Arg(
("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
)
ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more verbose", action="store_true")
ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
ARG_DONOT_PICKLE = Arg(
("-x", "--donot-pickle"),
help=(
"Do not attempt to pickle the DAG object to send over "
"to the workers, just tell the workers to run their version "
"of the code"
),
action="store_true",
)
ARG_BF_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help=(
"Skip upstream tasks, run only the tasks "
"matching the regexp. Only works in conjunction "
"with task_regex"
),
action="store_true",
)
ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST = Arg(
("-I", "--ignore-first-depends-on-past"),
help=(
"Ignores depends_on_past dependencies for the first "
"set of tasks only (subsequent executions in the backfill "
"DO respect depends_on_past)"
),
action="store_true",
)
ARG_POOL = Arg(("--pool",), "Resource pool to use")
ARG_DELAY_ON_LIMIT = Arg(
("--delay-on-limit",),
help=(
"Amount of time in seconds to wait when the limit "
"on maximum active dag runs (max_active_runs) has "
"been reached before trying to execute a dag run "
"again"
),
type=float,
default=1.0,
)
ARG_RESET_DAG_RUN = Arg(
("--reset-dagruns",),
help=(
"if set, the backfill will delete existing "
"backfill-related DAG runs and start "
"anew with fresh, running DAG runs"
),
action="store_true",
)
ARG_RERUN_FAILED_TASKS = Arg(
("--rerun-failed-tasks",),
help=(
"if set, the backfill will auto-rerun "
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"
),
action="store_true",
)
ARG_CONTINUE_ON_FAILURES = Arg(
("--continue-on-failures",),
help=("if set, the backfill will keep going even if some of the tasks failed"),
action="store_true",
)
ARG_DISABLE_RETRY = Arg(
("--disable-retry",),
help=("if set, the backfill will set tasks as failed without retrying."),
action="store_true",
)
ARG_RUN_BACKWARDS = Arg(
(
"-B",
"--run-backwards",
),
help=(
"if set, the backfill will run tasks from the most "
"recent day first. if there are tasks that depend_on_past "
"this option will throw an exception"
),
action="store_true",
)
ARG_TREAT_DAG_AS_REGEX = Arg(
("--treat-dag-as-regex",),
help=("if set, dag_id will be treated as regex instead of an exact string"),
action="store_true",
)
# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
help=(
"After completing the backfill, shows the diagram for current DAG Run.\n"
"\n"
"The diagram is in DOT language\n"
),
action="store_true",
)
ARG_IMGCAT_DAGRUN = Arg(
("--imgcat-dagrun",),
help=(
"After completing the dag run, prints a diagram on the screen for the "
"current DAG Run using the imgcat tool.\n"
),
action="store_true",
)
ARG_SAVE_DAGRUN = Arg(
("--save-dagrun",),
help="After completing the backfill, saves the diagram for current DAG Run to the indicated file.\n\n",
)
# list_tasks
ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
# tasks_run
# This is a hidden option -- not meant for users to set or know about
ARG_SHUT_DOWN_LOGGING = Arg(
("--no-shut-down-logging",),
help=argparse.SUPPRESS,
dest="shut_down_logging",
action="store_false",
default=True,
)
# clear
ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", action="store_true")
ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", action="store_true")
ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", action="store_true")
ARG_EXCLUDE_PARENTDAG = Arg(
("-X", "--exclude-parentdag"),
help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
action="store_true",
)
ARG_DAG_REGEX = Arg(
("-R", "--dag-regex"), help="Search dag_id as regex instead of exact string", action="store_true"
)
# show_dag
ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated file.")
ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", action="store_true")
# trigger_dag
ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the DagRun's conf attribute")
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
ARG_REPLACE_MICRO = Arg(
("--no-replace-microseconds",),
help="whether microseconds should be zeroed",
dest="replace_microseconds",
action="store_false",
default=True,
)
# db
ARG_DB_TABLES = Arg(
("-t", "--tables"),
help=lazy_object_proxy.Proxy(
lambda: f"Table names to perform maintenance on (use comma-separated list).\n"
f"Options: {import_string('airflow.cli.commands.db_command.all_tables')}"
),
type=string_list_type,
)
ARG_DB_CLEANUP_TIMESTAMP = Arg(
("--clean-before-timestamp",),
help="The date or timestamp before which data should be purged.\n"
"If no timezone info is supplied then dates are assumed to be in airflow default timezone.\n"
"Example: '2022-01-01 00:00:00+01:00'",
type=parsedate,
required=True,
)
ARG_DB_DRY_RUN = Arg(
("--dry-run",),
help="Perform a dry run",
action="store_true",
)
ARG_DB_SKIP_ARCHIVE = Arg(
("--skip-archive",),
help="Don't preserve purged records in an archive table.",
action="store_true",
)
ARG_DB_EXPORT_FORMAT = Arg(
("--export-format",),
help="The file format to export the cleaned data",
choices=("csv",),
default="csv",
)
ARG_DB_OUTPUT_PATH = Arg(
("--output-path",),
metavar="DIRPATH",
help="The path to the output directory to export the cleaned data. This directory must exist.",
required=True,
)
ARG_DB_DROP_ARCHIVES = Arg(
("--drop-archives",),
help="Drop the archive tables after exporting. Use with caution.",
action="store_true",
)
# pool
ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
ARG_POOL_IMPORT = Arg(
("file",),
metavar="FILEPATH",
help="Import pools from JSON file. Example format::\n"
+ textwrap.indent(
textwrap.dedent(
"""
{
"pool_1": {"slots": 5, "description": ""},
"pool_2": {"slots": 10, "description": "test"}
}"""
),
" " * 4,
),
)
ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to JSON file")
# variables
ARG_VAR = Arg(("key",), help="Variable key")
ARG_VAR_VALUE = Arg(("value",), metavar="VALUE", help="Variable value")
ARG_DEFAULT = Arg(
("-d", "--default"), metavar="VAL", default=None, help="Default value returned if variable does not exist"
)
ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable", action="store_true")
ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
# kerberos
ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab"))
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
help="Do not capture standard output and error streams (useful for interactive debugging)",
action="store_true",
)
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
# all dependencies (not just past success), e.g. the ignore_depends_on_past
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
# the "ignore_all_dependencies" command should be called the"force" command
# instead.
ARG_FORCE = Arg(
("-f", "--force"),
help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
action="store_true",
)
ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
ARG_IGNORE_ALL_DEPENDENCIES = Arg(
("-A", "--ignore-all-dependencies"),
help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
action="store_true",
)
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
# vague (e.g. a task being in the appropriate state to be run is also a dependency
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
# slightly better (as it ignores all dependencies that are specific to the task),
# so deprecate the old command name and use this instead.
ARG_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
action="store_true",
)
ARG_IGNORE_DEPENDS_ON_PAST = Arg(
("-I", "--ignore-depends-on-past"),
help="Deprecated -- use `--depends-on-past ignore` instead. "
"Ignore depends_on_past dependencies (but respect upstream dependencies)",
action="store_true",
)
ARG_DEPENDS_ON_PAST = Arg(
("-d", "--depends-on-past"),
help="Determine how Airflow should deal with past dependencies. The default action is `check`, Airflow "
"will check if the the past dependencies are met for the tasks having `depends_on_past=True` before run "
"them, if `ignore` is provided, the past dependencies will be ignored, if `wait` is provided and "
"`depends_on_past=True`, Airflow will wait the past dependencies until they are met before running or "
"skipping the task",
choices={"check", "ignore", "wait"},
default="check",
)
ARG_SHIP_DAG = Arg(
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
# database
ARG_MIGRATION_TIMEOUT = Arg(
("-t", "--migration-wait-timeout"),
help="timeout to wait for db to migrate ",
type=int,
default=60,
)
ARG_DB_RESERIALIZE_DAGS = Arg(
("--no-reserialize-dags",),
# Not intended for user, so dont show in help
help=argparse.SUPPRESS,
action="store_false",
default=True,
dest="reserialize_dags",
)
ARG_DB_VERSION__UPGRADE = Arg(
("-n", "--to-version"),
help=(
"(Optional) The airflow version to upgrade to. Note: must provide either "
"`--to-revision` or `--to-version`."
),
)
ARG_DB_REVISION__UPGRADE = Arg(
("-r", "--to-revision"),
help="(Optional) If provided, only run migrations up to and including this Alembic revision.",
)
ARG_DB_VERSION__DOWNGRADE = Arg(
("-n", "--to-version"),
help="(Optional) If provided, only run migrations up to this version.",
)
ARG_DB_FROM_VERSION = Arg(
("--from-version",),
help="(Optional) If generating sql, may supply a *from* version",
)
ARG_DB_REVISION__DOWNGRADE = Arg(
("-r", "--to-revision"),
help="The Alembic revision to downgrade to. Note: must provide either `--to-revision` or `--to-version`.",
)
ARG_DB_FROM_REVISION = Arg(
("--from-revision",),
help="(Optional) If generating sql, may supply a *from* Alembic revision",
)
ARG_DB_SQL_ONLY = Arg(
("-s", "--show-sql-only"),
help="Don't actually run migrations; just print out sql scripts for offline migration. "
"Required if using either `--from-revision` or `--from-version`.",
action="store_true",
default=False,
)
ARG_DB_SKIP_INIT = Arg(
("-s", "--skip-init"),
help="Only remove tables; do not perform db init.",
action="store_true",
default=False,
)
# webserver
ARG_PORT = Arg(
("-p", "--port"),
default=conf.get("webserver", "WEB_SERVER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_SSL_CERT = Arg(
("--ssl-cert",),
default=conf.get("webserver", "WEB_SERVER_SSL_CERT"),
help="Path to the SSL certificate for the webserver",
)
ARG_SSL_KEY = Arg(
("--ssl-key",),
default=conf.get("webserver", "WEB_SERVER_SSL_KEY"),
help="Path to the key to use with the SSL certificate",
)
ARG_WORKERS = Arg(
("-w", "--workers"),
default=conf.get("webserver", "WORKERS"),
type=int,
help="Number of workers to run the webserver on",
)
ARG_WORKERCLASS = Arg(
("-k", "--workerclass"),
default=conf.get("webserver", "WORKER_CLASS"),
choices=["sync", "eventlet", "gevent", "tornado"],
help="The worker class to use for Gunicorn",
)
ARG_WORKER_TIMEOUT = Arg(
("-t", "--worker-timeout"),
default=conf.get("webserver", "WEB_SERVER_WORKER_TIMEOUT"),
type=int,
help="The timeout for waiting on webserver workers",
)
ARG_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("webserver", "WEB_SERVER_HOST"),
help="Set the hostname on which to run the web server",
)
ARG_DEBUG = Arg(
("-d", "--debug"), help="Use the server that ships with Flask in debug mode", action="store_true"
)
ARG_ACCESS_LOGFILE = Arg(
("-A", "--access-logfile"),
default=conf.get("webserver", "ACCESS_LOGFILE"),
help="The logfile to store the webserver access log. Use '-' to print to stdout",
)
ARG_ERROR_LOGFILE = Arg(
("-E", "--error-logfile"),
default=conf.get("webserver", "ERROR_LOGFILE"),
help="The logfile to store the webserver error log. Use '-' to print to stderr",
)
ARG_ACCESS_LOGFORMAT = Arg(
("-L", "--access-logformat"),
default=conf.get("webserver", "ACCESS_LOGFORMAT"),
help="The access log format for gunicorn logs",
)
# internal-api
ARG_INTERNAL_API_PORT = Arg(
("-p", "--port"),
default=9080,
type=int,
help="The port on which to run the server",
)
ARG_INTERNAL_API_WORKERS = Arg(
("-w", "--workers"),
default=4,
type=int,
help="Number of workers to run the Internal API-on",
)
ARG_INTERNAL_API_WORKERCLASS = Arg(
("-k", "--workerclass"),
default="sync",
choices=["sync", "eventlet", "gevent", "tornado"],
help="The worker class to use for Gunicorn",
)
ARG_INTERNAL_API_WORKER_TIMEOUT = Arg(
("-t", "--worker-timeout"),
default=120,
type=int,
help="The timeout for waiting on Internal API workers",
)
ARG_INTERNAL_API_HOSTNAME = Arg(
("-H", "--hostname"),
default="0.0.0.0",
help="Set the hostname on which to run the web server",
)
ARG_INTERNAL_API_ACCESS_LOGFILE = Arg(
("-A", "--access-logfile"),
help="The logfile to store the access log. Use '-' to print to stdout",
)
ARG_INTERNAL_API_ERROR_LOGFILE = Arg(
("-E", "--error-logfile"),
help="The logfile to store the error log. Use '-' to print to stderr",
)
ARG_INTERNAL_API_ACCESS_LOGFORMAT = Arg(
("-L", "--access-logformat"),
help="The access log format for gunicorn logs",
)
# scheduler
ARG_NUM_RUNS = Arg(
("-n", "--num-runs"),
default=conf.getint("scheduler", "num_runs"),
type=int,
help="Set the number of runs to execute before exiting",
)
ARG_DO_PICKLE = Arg(
("-p", "--do-pickle"),
default=False,
help=(
"Attempt to pickle the DAG object to send over "
"to the workers, instead of letting workers run their version "
"of the code"
),
action="store_true",
)
# worker
ARG_QUEUES = Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get("operators", "DEFAULT_QUEUE"),
)
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.get("celery", "worker_concurrency"),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
help="Set the hostname of celery worker if you have multiple workers on a single machine",
)
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of celery worker in daemon mode",
)
ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
default=False,
help="Don't synchronize with other workers at start-up",
action="store_true",
)
ARG_WITHOUT_GOSSIP = Arg(
("--without-gossip",),
default=False,
help="Don't subscribe to other workers events",
action="store_true",
)
# flower
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("celery", "FLOWER_HOST"),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.get("celery", "FLOWER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"), default=conf.get("celery", "FLOWER_URL_PREFIX"), help="URL prefix for Flower"
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get("celery", "FLOWER_BASIC_AUTH"),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
"Example: flower_basic_auth = user1:password1,user2:password2"
),
)
ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
ARG_POST_MORTEM = Arg(
("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
)
ARG_ENV_VARS = Arg(
("--env-vars",),
help="Set env var in both parsing time and runtime for each of entry supplied in a JSON dict",
type=json.loads,
)
# connections
ARG_CONN_ID = Arg(("conn_id",), help="Connection id, required to get/add/delete/test a connection", type=str)
ARG_CONN_ID_FILTER = Arg(
("--conn-id",), help="If passed, only items with the specified connection ID will be displayed", type=str
)
ARG_CONN_URI = Arg(
("--conn-uri",), help="Connection URI, required to add a connection without conn_type", type=str
)
ARG_CONN_JSON = Arg(
("--conn-json",), help="Connection JSON, required to add a connection using JSON representation", type=str
)
ARG_CONN_TYPE = Arg(
("--conn-type",), help="Connection type, required to add a connection without conn_uri", type=str
)
ARG_CONN_DESCRIPTION = Arg(
("--conn-description",), help="Connection description, optional when adding a connection", type=str
)
ARG_CONN_HOST = Arg(("--conn-host",), help="Connection host, optional when adding a connection", type=str)
ARG_CONN_LOGIN = Arg(("--conn-login",), help="Connection login, optional when adding a connection", type=str)
ARG_CONN_PASSWORD = Arg(
("--conn-password",), help="Connection password, optional when adding a connection", type=str
)
ARG_CONN_SCHEMA = Arg(
("--conn-schema",), help="Connection schema, optional when adding a connection", type=str
)
ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port, optional when adding a connection", type=str)
ARG_CONN_EXTRA = Arg(
("--conn-extra",), help="Connection `Extra` field, optional when adding a connection", type=str
)
ARG_CONN_EXPORT = Arg(
("file",),
help="Output file path for exporting the connections",
type=argparse.FileType("w", encoding="UTF-8"),
)
ARG_CONN_EXPORT_FORMAT = Arg(
("--format",),
help="Deprecated -- use `--file-format` instead. File format to use for the export.",
type=str,
choices=["json", "yaml", "env"],
)
ARG_CONN_EXPORT_FILE_FORMAT = Arg(
("--file-format",), help="File format for the export", type=str, choices=["json", "yaml", "env"]
)
ARG_CONN_SERIALIZATION_FORMAT = Arg(
("--serialization-format",),
help="When exporting as `.env` format, defines how connections should be serialized. Default is `uri`.",
type=string_lower_type,
choices=["json", "uri"],
)
ARG_CONN_IMPORT = Arg(("file",), help="Import connections from a file")
ARG_CONN_OVERWRITE = Arg(
("--overwrite",),
help="Overwrite existing entries if a conflict occurs",
required=False,
action="store_true",
)
# providers
ARG_PROVIDER_NAME = Arg(
("provider_name",), help="Provider name, required to get provider information", type=str
)
ARG_FULL = Arg(
("-f", "--full"),
help="Full information about the provider, including documentation information.",
required=False,
action="store_true",
)
# users
ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user", required=True, type=str)
ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user", type=str)
ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user", required=True, type=str)
ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user", required=True, type=str)
ARG_ROLE = Arg(
("-r", "--role"),
help="Role of the user. Existing roles include Admin, User, Op, Viewer, and Public",
required=True,
type=str,
)
ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True, type=str)
ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str)
ARG_PASSWORD = Arg(
("-p", "--password"),
help="Password of the user, required to create a user without --use-random-password",
type=str,
)
ARG_USE_RANDOM_PASSWORD = Arg(
("--use-random-password",),
help="Do not prompt for password. Use random string instead."
" Required to create a user without --password ",
default=False,
action="store_true",
)
ARG_USER_IMPORT = Arg(
("import",),
metavar="FILEPATH",
help="Import users from JSON file. Example format::\n"
+ textwrap.indent(
textwrap.dedent(
"""
[
{
"email": "foo@bar.org",
"firstname": "Jon",
"lastname": "Doe",
"roles": ["Public"],
"username": "jondoe"
}
]"""
),
" " * 4,
),
)
ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users to JSON file")
# roles
ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role", action="store_true")
ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true")
ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*")
ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions", action="store_true")
ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions", nargs="*", required=True)
ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions", nargs="*")
ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of permissions", nargs="*", required=True)
ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of worker to autoscale")
ARG_SKIP_SERVE_LOGS = Arg(
("-s", "--skip-serve-logs"),
default=False,
help="Don't start the serve logs process along with the workers",
action="store_true",
)
ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", nargs=None)
ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", nargs=None)
ARG_ROLE_EXPORT_FMT = Arg(
("-p", "--pretty"),
help="Format output JSON file by sorting role names and indenting by 4 spaces",
action="store_true",
)
# info
ARG_ANONYMIZE = Arg(
("--anonymize",),
help="Minimize any personal identifiable information. Use it when sharing output with others.",
action="store_true",
)
ARG_FILE_IO = Arg(
("--file-io",), help="Send output to file.io service and returns link.", action="store_true"
)
# config
ARG_SECTION = Arg(
("section",),
help="The section name",
)
ARG_OPTION = Arg(
("option",),
help="The option name",
)
ARG_OPTIONAL_SECTION = Arg(
("--section",),
help="The section name",
)
# kubernetes cleanup-pods
ARG_NAMESPACE = Arg(
("--namespace",),
default=conf.get("kubernetes_executor", "namespace"),
help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",
)
ARG_MIN_PENDING_MINUTES = Arg(
("--min-pending-minutes",),
default=30,
type=positive_int(allow_zero=False),
help=(
"Pending pods created before the time interval are to be cleaned up, "
"measured in minutes. Default value is 30(m). The minimum value is 5(m)."
),
)
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
("--job-type",),
choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob", "DagProcessorJob"),
action="store",
help="The type of job(s) that will be checked.",
)
ARG_JOB_HOSTNAME_FILTER = Arg(
("--hostname",),
default=None,
type=str,
help="The hostname of job(s) that will be checked.",
)
ARG_JOB_HOSTNAME_CALLABLE_FILTER = Arg(
("--local",),
action="store_true",
help="If passed, this command will only show jobs from the local host "
"(those with a hostname matching what `hostname_callable` returns).",
)
ARG_JOB_LIMIT = Arg(
("--limit",),
default=1,
type=positive_int(allow_zero=True),
help="The number of recent jobs that will be checked. To disable limit, set 0. ",
)
ARG_ALLOW_MULTIPLE = Arg(
("--allow-multiple",),
action="store_true",
help="If passed, this command will be successful even if multiple matching alive jobs are found.",
)
# sync-perm
ARG_INCLUDE_DAGS = Arg(
("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true"
)
# triggerer
ARG_CAPACITY = Arg(
("--capacity",),
type=positive_int(allow_zero=False),
help="The maximum number of triggers that a Triggerer will run at one time.",
)
# reserialize
ARG_CLEAR_ONLY = Arg(
("--clear-only",),
action="store_true",
help="If passed, serialized DAGs will be cleared but not reserialized.",
)
ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
ARG_CONN_HOST,
ARG_CONN_LOGIN,
ARG_CONN_PASSWORD,
ARG_CONN_SCHEMA,
ARG_CONN_PORT,
]
class ActionCommand(NamedTuple):
"""Single CLI command."""
name: str
help: str
func: Callable
args: Iterable[Arg]
description: str | None = None
epilog: str | None = None
class GroupCommand(NamedTuple):
"""ClI command with subcommands."""
name: str
help: str
subcommands: Iterable
description: str | None = None
epilog: str | None = None
CLICommand = Union[ActionCommand, GroupCommand]
DAGS_COMMANDS = (
ActionCommand(
name="details",
help="Get DAG details given a DAG id",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list",
help="List all the DAGs",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list-import-errors",
help="List all the DAGs that have import errors",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_import_errors"),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="report",
help="Show DagBag loading report",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_report"),
args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list-runs",
help="List DAG runs given a DAG id",
description=(
"List DAG runs given a DAG id. If state option is given, it will only search for all the "
"dagruns with the given state. If no_backfill option is given, it will filter out all "
"backfill dagruns for given dag id. If start_date is given, it will filter out all the "
"dagruns that were executed before this date. If end_date is given, it will filter out "
"all the dagruns that were executed after this date. "
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dag_runs"),
args=(
ARG_DAG_ID_REQ_FLAG,
ARG_NO_BACKFILL,
ARG_STATE,
ARG_OUTPUT,
ARG_VERBOSE,
ARG_START_DATE,
ARG_END_DATE,
),
),
ActionCommand(
name="list-jobs",
help="List the jobs",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_jobs"),
args=(ARG_DAG_ID_OPT, ARG_STATE, ARG_LIMIT, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="state",
help="Get the status of a dag run",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_state"),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_VERBOSE),
),
ActionCommand(
name="next-execution",
help="Get the next execution datetimes of a DAG",
description=(
"Get the next execution datetimes of a DAG. It returns one execution unless the "
"num-executions option is given"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_next_execution"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS, ARG_VERBOSE),
),
ActionCommand(
name="pause",
help="Pause a DAG",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
),
ActionCommand(
name="unpause",
help="Resume a paused DAG",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
),
ActionCommand(
name="trigger",
help="Trigger a DAG run",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
args=(
ARG_DAG_ID,
ARG_SUBDIR,
ARG_RUN_ID,
ARG_CONF,
ARG_EXEC_DATE,
ARG_VERBOSE,
ARG_REPLACE_MICRO,
ARG_OUTPUT,
),
),
ActionCommand(
name="delete",
help="Delete all DB records related to the specified DAG",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_delete"),
args=(ARG_DAG_ID, ARG_YES, ARG_VERBOSE),
),
ActionCommand(
name="show",
help="Displays DAG's tasks with their dependencies",
description=(
"The --imgcat option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"The --save option saves the result to the indicated file.\n"
"\n"
"The file format is determined by the file extension. "
"For more information about supported "
"format, see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.dot\n"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_show"),
args=(
ARG_DAG_ID,
ARG_SUBDIR,
ARG_SAVE,
ARG_IMGCAT,
ARG_VERBOSE,
),
),
ActionCommand(
name="show-dependencies",
help="Displays DAGs with their dependencies",
description=(
"The --imgcat option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"The --save option saves the result to the indicated file.\n"
"\n"
"The file format is determined by the file extension. "
"For more information about supported "
"format, see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags show-dependencies --save output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags show-dependencies --save output.dot\n"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_dependencies_show"),
args=(
ARG_SUBDIR,
ARG_SAVE,
ARG_IMGCAT,
ARG_VERBOSE,
),
),
ActionCommand(
name="backfill",
help="Run subsections of a DAG for a specified date range",
description=(
"Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
"backfill will first prompt users whether airflow should clear all the previous dag_run and "
"task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
"will auto re-run the previous failed task instances within the backfill date range"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_backfill"),
args=(
ARG_DAG_ID,
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_MARK_SUCCESS,
ARG_LOCAL,
ARG_DONOT_PICKLE,
ARG_YES,
ARG_CONTINUE_ON_FAILURES,
ARG_DISABLE_RETRY,
ARG_BF_IGNORE_DEPENDENCIES,
ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
ARG_SUBDIR,
ARG_POOL,
ARG_DELAY_ON_LIMIT,
ARG_DRY_RUN,
ARG_VERBOSE,
ARG_CONF,
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
ARG_TREAT_DAG_AS_REGEX,
),
),
ActionCommand(
name="test",
help="Execute one single DagRun",
description=(
"Execute one single DagRun for a given DAG and execution date.\n"
"\n"
"The --imgcat-dagrun option only works in iTerm.\n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html\n"
"\n"
"If --save-dagrun is used, then, after completing the backfill, saves the diagram "
"for current DAG Run to the indicated file.\n"
"The file format is determined by the file extension. "
"For more information about supported format, "
"see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun output.dot\n"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_test"),
args=(
ARG_DAG_ID,
ARG_EXECUTION_DATE_OPTIONAL,
ARG_CONF,
ARG_SUBDIR,
ARG_SHOW_DAGRUN,
ARG_IMGCAT_DAGRUN,
ARG_SAVE_DAGRUN,
ARG_VERBOSE,
),
),
ActionCommand(
name="reserialize",
help="Reserialize all DAGs by parsing the DagBag files",
description=(
"Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
"from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
"version of Airflow that you are running."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"),
args=(
ARG_CLEAR_ONLY,
ARG_SUBDIR,
ARG_VERBOSE,
),
),
)
TASKS_COMMANDS = (
ActionCommand(
name="list",
help="List the tasks within a DAG",
func=lazy_load_command("airflow.cli.commands.task_command.task_list"),
args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, ARG_VERBOSE),
),
ActionCommand(
name="clear",
help="Clear a set of task instance, as if they never ran",
func=lazy_load_command("airflow.cli.commands.task_command.task_clear"),
args=(
ARG_DAG_ID,
ARG_TASK_REGEX,
ARG_START_DATE,
ARG_END_DATE,
ARG_SUBDIR,
ARG_UPSTREAM,
ARG_DOWNSTREAM,
ARG_YES,
ARG_ONLY_FAILED,
ARG_ONLY_RUNNING,
ARG_EXCLUDE_SUBDAGS,
ARG_EXCLUDE_PARENTDAG,
ARG_DAG_REGEX,
ARG_VERBOSE,
),
),
ActionCommand(
name="state",
help="Get the status of a task instance",
func=lazy_load_command("airflow.cli.commands.task_command.task_state"),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
),
ActionCommand(
name="failed-deps",
help="Returns the unmet dependencies for a task instance",
description=(
"Returns the unmet dependencies for a task instance from the perspective of the scheduler. "
"In other words, why a task instance doesn't get scheduled and then queued by the scheduler, "
"and then run by an executor."
),
func=lazy_load_command("airflow.cli.commands.task_command.task_failed_deps"),
args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_SUBDIR, ARG_MAP_INDEX, ARG_VERBOSE),
),
ActionCommand(
name="render",
help="Render a task instance's template(s)",
func=lazy_load_command("airflow.cli.commands.task_command.task_render"),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_VERBOSE,
ARG_MAP_INDEX,
),
),
ActionCommand(
name="run",
help="Run a single task instance",
func=lazy_load_command("airflow.cli.commands.task_command.task_run"),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_MARK_SUCCESS,
ARG_FORCE,
ARG_POOL,
ARG_CFG_PATH,
ARG_LOCAL,
ARG_RAW,
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_IGNORE_DEPENDS_ON_PAST,
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
ARG_JOB_ID,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
ARG_VERBOSE,
),
),
ActionCommand(
name="test",
help="Test a task instance",
description=(
"Test a task instance. This will run a task without checking for dependencies or recording "
"its state in the database"
),
func=lazy_load_command("airflow.cli.commands.task_command.task_test"),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL,
ARG_SUBDIR,
ARG_DRY_RUN,
ARG_TASK_PARAMS,
ARG_POST_MORTEM,
ARG_ENV_VARS,
ARG_MAP_INDEX,
ARG_VERBOSE,
),
),
ActionCommand(
name="states-for-dag-run",
help="Get the status of all task instances in a dag run",
func=lazy_load_command("airflow.cli.commands.task_command.task_states_for_dag_run"),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT, ARG_VERBOSE),
),
)
POOLS_COMMANDS = (
ActionCommand(
name="list",
help="List pools",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="get",
help="Get pool size",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_get"),
args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="set",
help="Configure pool",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_set"),
args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="delete",
help="Delete pool",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_delete"),
args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="import",
help="Import pools",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_import"),
args=(ARG_POOL_IMPORT, ARG_VERBOSE),
),
ActionCommand(
name="export",
help="Export all pools",
func=lazy_load_command("airflow.cli.commands.pool_command.pool_export"),
args=(ARG_POOL_EXPORT, ARG_VERBOSE),
),
)
VARIABLES_COMMANDS = (
ActionCommand(
name="list",
help="List variables",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="get",
help="Get variable",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_get"),
args=(ARG_VAR, ARG_DESERIALIZE_JSON, ARG_DEFAULT, ARG_VERBOSE),
),
ActionCommand(
name="set",
help="Set variable",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_set"),
args=(ARG_VAR, ARG_VAR_VALUE, ARG_SERIALIZE_JSON, ARG_VERBOSE),
),
ActionCommand(
name="delete",
help="Delete variable",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_delete"),
args=(ARG_VAR, ARG_VERBOSE),
),
ActionCommand(
name="import",
help="Import variables",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_import"),
args=(ARG_VAR_IMPORT, ARG_VERBOSE),
),
ActionCommand(
name="export",
help="Export all variables",
func=lazy_load_command("airflow.cli.commands.variable_command.variables_export"),
args=(ARG_VAR_EXPORT, ARG_VERBOSE),
),
)
DB_COMMANDS = (
ActionCommand(
name="init",
help="Initialize the metadata database",
func=lazy_load_command("airflow.cli.commands.db_command.initdb"),
args=(ARG_VERBOSE,),
),
ActionCommand(
name="check-migrations",
help="Check if migration have finished",
description="Check if migration have finished (or continually check until timeout)",
func=lazy_load_command("airflow.cli.commands.db_command.check_migrations"),
args=(ARG_MIGRATION_TIMEOUT, ARG_VERBOSE),
),
ActionCommand(
name="reset",
help="Burn down and rebuild the metadata database",
func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
),
ActionCommand(
name="upgrade",
help="Upgrade the metadata database to latest version",
description=(
"Upgrade the schema of the metadata database. "
"To print but not execute commands, use option ``--show-sql-only``. "
"If using options ``--from-revision`` or ``--from-version``, you must also use "
"``--show-sql-only``, because if actually *running* migrations, we should only "
"migrate from the *current* Alembic revision."
),
func=lazy_load_command("airflow.cli.commands.db_command.upgradedb"),
args=(
ARG_DB_REVISION__UPGRADE,
ARG_DB_VERSION__UPGRADE,
ARG_DB_SQL_ONLY,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
ARG_VERBOSE,
),
),
ActionCommand(
name="downgrade",
help="Downgrade the schema of the metadata database.",
description=(
"Downgrade the schema of the metadata database. "
"You must provide either `--to-revision` or `--to-version`. "
"To print but not execute commands, use option `--show-sql-only`. "
"If using options `--from-revision` or `--from-version`, you must also use `--show-sql-only`, "
"because if actually *running* migrations, we should only migrate from the *current* Alembic "
"revision."
),
func=lazy_load_command("airflow.cli.commands.db_command.downgrade"),
args=(
ARG_DB_REVISION__DOWNGRADE,
ARG_DB_VERSION__DOWNGRADE,
ARG_DB_SQL_ONLY,
ARG_YES,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_VERBOSE,
),
),
ActionCommand(
name="shell",
help="Runs a shell to access the database",
func=lazy_load_command("airflow.cli.commands.db_command.shell"),
args=(ARG_VERBOSE,),
),
ActionCommand(
name="check",
help="Check if the database can be reached",
func=lazy_load_command("airflow.cli.commands.db_command.check"),
args=(ARG_VERBOSE,),
),
ActionCommand(
name="clean",
help="Purge old records in metastore tables",
func=lazy_load_command("airflow.cli.commands.db_command.cleanup_tables"),
args=(
ARG_DB_TABLES,
ARG_DB_DRY_RUN,
ARG_DB_CLEANUP_TIMESTAMP,
ARG_VERBOSE,
ARG_YES,
ARG_DB_SKIP_ARCHIVE,
),
),
ActionCommand(
name="export-archived",
help="Export archived data from the archive tables",
func=lazy_load_command("airflow.cli.commands.db_command.export_archived"),
args=(
ARG_DB_EXPORT_FORMAT,
ARG_DB_OUTPUT_PATH,
ARG_DB_DROP_ARCHIVES,
ARG_DB_TABLES,
ARG_YES,
),
),
ActionCommand(
name="drop-archived",
help="Drop archived tables created through the db clean command",
func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"),
args=(ARG_DB_TABLES, ARG_YES),
),
)
CONNECTIONS_COMMANDS = (
ActionCommand(
name="get",
help="Get a connection",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_get"),
args=(ARG_CONN_ID, ARG_COLOR, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="list",
help="List connections",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_list"),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_CONN_ID_FILTER),
),
ActionCommand(
name="add",
help="Add a connection",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_add"),
args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) + tuple(ALTERNATIVE_CONN_SPECS_ARGS),
),
ActionCommand(
name="delete",
help="Delete a connection",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_delete"),
args=(ARG_CONN_ID, ARG_COLOR, ARG_VERBOSE),
),
ActionCommand(
name="export",
help="Export all connections",
description=(
"All connections can be exported in STDOUT using the following command:\n"
"airflow connections export -\n"
"The file format can be determined by the provided file extension. E.g., The following "
"command will export the connections in JSON format:\n"
"airflow connections export /tmp/connections.json\n"
"The --file-format parameter can be used to control the file format. E.g., "
"the default format is JSON in STDOUT mode, which can be overridden using: \n"
"airflow connections export - --file-format yaml\n"
"The --file-format parameter can also be used for the files, for example:\n"
"airflow connections export /tmp/connections --file-format json.\n"
"When exporting in `env` file format, you control whether URI format or JSON format "
"is used to serialize the connection by passing `uri` or `json` with option "
"`--serialization-format`.\n"
),
func=lazy_load_command("airflow.cli.commands.connection_command.connections_export"),
args=(
ARG_CONN_EXPORT,
ARG_CONN_EXPORT_FORMAT,
ARG_CONN_EXPORT_FILE_FORMAT,
ARG_CONN_SERIALIZATION_FORMAT,
ARG_VERBOSE,
),
),
ActionCommand(
name="import",
help="Import connections from a file",
description=(
"Connections can be imported from the output of the export command.\n"
"The filetype must by json, yaml or env and will be automatically inferred."
),
func=lazy_load_command("airflow.cli.commands.connection_command.connections_import"),
args=(
ARG_CONN_IMPORT,
ARG_CONN_OVERWRITE,
ARG_VERBOSE,
),
),
ActionCommand(
name="test",
help="Test a connection",
func=lazy_load_command("airflow.cli.commands.connection_command.connections_test"),
args=(ARG_CONN_ID, ARG_VERBOSE),
),
)
PROVIDERS_COMMANDS = (
ActionCommand(
name="list",
help="List installed providers",
func=lazy_load_command("airflow.cli.commands.provider_command.providers_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="get",
help="Get detailed information about a provider",
func=lazy_load_command("airflow.cli.commands.provider_command.provider_get"),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_FULL, ARG_COLOR, ARG_PROVIDER_NAME),
),
ActionCommand(
name="links",
help="List extra links registered by the providers",
func=lazy_load_command("airflow.cli.commands.provider_command.extra_links_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="widgets",
help="Get information about registered connection form widgets",
func=lazy_load_command("airflow.cli.commands.provider_command.connection_form_widget_list"),
args=(
ARG_OUTPUT,
ARG_VERBOSE,
),
),
ActionCommand(
name="hooks",
help="List registered provider hooks",
func=lazy_load_command("airflow.cli.commands.provider_command.hooks_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="triggers",
help="List registered provider triggers",
func=lazy_load_command("airflow.cli.commands.provider_command.triggers_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="behaviours",
help="Get information about registered connection types with custom behaviours",
func=lazy_load_command("airflow.cli.commands.provider_command.connection_field_behaviours"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="logging",
help="Get information about task logging handlers provided",
func=lazy_load_command("airflow.cli.commands.provider_command.logging_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="secrets",
help="Get information about secrets backends provided",
func=lazy_load_command("airflow.cli.commands.provider_command.secrets_backends_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="auth",
help="Get information about API auth backends provided",
func=lazy_load_command("airflow.cli.commands.provider_command.auth_backend_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
)
USERS_COMMANDS = (
ActionCommand(
name="list",
help="List users",
func=lazy_load_command("airflow.cli.commands.user_command.users_list"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="create",
help="Create a user",
func=lazy_load_command("airflow.cli.commands.user_command.users_create"),
args=(
ARG_ROLE,
ARG_USERNAME,
ARG_EMAIL,
ARG_FIRSTNAME,
ARG_LASTNAME,
ARG_PASSWORD,
ARG_USE_RANDOM_PASSWORD,
ARG_VERBOSE,
),
epilog=(
"examples:\n"
'To create an user with "Admin" role and username equals to "admin", run:\n'
"\n"
" $ airflow users create \\\n"
" --username admin \\\n"
" --firstname FIRST_NAME \\\n"
" --lastname LAST_NAME \\\n"
" --role Admin \\\n"
" --email admin@example.org"
),
),
ActionCommand(
name="delete",
help="Delete a user",
func=lazy_load_command("airflow.cli.commands.user_command.users_delete"),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_VERBOSE),
),
ActionCommand(
name="add-role",
help="Add role to a user",
func=lazy_load_command("airflow.cli.commands.user_command.add_role"),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE),
),
ActionCommand(
name="remove-role",
help="Remove role from a user",
func=lazy_load_command("airflow.cli.commands.user_command.remove_role"),
args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, ARG_VERBOSE),
),
ActionCommand(
name="import",
help="Import users",
func=lazy_load_command("airflow.cli.commands.user_command.users_import"),
args=(ARG_USER_IMPORT, ARG_VERBOSE),
),
ActionCommand(
name="export",
help="Export all users",
func=lazy_load_command("airflow.cli.commands.user_command.users_export"),
args=(ARG_USER_EXPORT, ARG_VERBOSE),
),
)
ROLES_COMMANDS = (
ActionCommand(
name="list",
help="List roles",
func=lazy_load_command("airflow.cli.commands.role_command.roles_list"),
args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="create",
help="Create role",
func=lazy_load_command("airflow.cli.commands.role_command.roles_create"),
args=(ARG_ROLES, ARG_VERBOSE),
),
ActionCommand(
name="delete",
help="Delete role",
func=lazy_load_command("airflow.cli.commands.role_command.roles_delete"),
args=(ARG_ROLES, ARG_VERBOSE),
),
ActionCommand(
name="add-perms",
help="Add roles permissions",
func=lazy_load_command("airflow.cli.commands.role_command.roles_add_perms"),
args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED, ARG_VERBOSE),
),
ActionCommand(
name="del-perms",
help="Delete roles permissions",
func=lazy_load_command("airflow.cli.commands.role_command.roles_del_perms"),
args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE),
),
ActionCommand(
name="export",
help="Export roles (without permissions) from db to JSON file",
func=lazy_load_command("airflow.cli.commands.role_command.roles_export"),
args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE),
),
ActionCommand(
name="import",
help="Import roles (without permissions) from JSON file to db",
func=lazy_load_command("airflow.cli.commands.role_command.roles_import"),
args=(ARG_ROLE_IMPORT, ARG_VERBOSE),
),
)
CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
ARG_CELERY_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_UMASK,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_AUTOSCALE,
ARG_SKIP_SERVE_LOGS,
ARG_WITHOUT_MINGLE,
ARG_WITHOUT_GOSSIP,
ARG_VERBOSE,
),
),
ActionCommand(
name="flower",
help="Start a Celery Flower",
func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
ARG_FLOWER_CONF,
ARG_FLOWER_URL_PREFIX,
ARG_FLOWER_BASIC_AUTH,
ARG_BROKER_API,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_VERBOSE,
),
),
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)
CONFIG_COMMANDS = (
ActionCommand(
name="get-value",
help="Print the value of the configuration",
func=lazy_load_command("airflow.cli.commands.config_command.get_value"),
args=(
ARG_SECTION,
ARG_OPTION,
ARG_VERBOSE,
),
),
ActionCommand(
name="list",
help="List options for the configuration",
func=lazy_load_command("airflow.cli.commands.config_command.show_config"),
args=(ARG_OPTIONAL_SECTION, ARG_COLOR, ARG_VERBOSE),
),
)
KUBERNETES_COMMANDS = (
ActionCommand(
name="cleanup-pods",
help=(
"Clean up Kubernetes pods "
"(created by KubernetesExecutor/KubernetesPodOperator) "
"in evicted/failed/succeeded/pending states"
),
func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"),
args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE),
),
ActionCommand(
name="generate-dag-yaml",
help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
"launching into a cluster",
func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"),
args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, ARG_VERBOSE),
),
)
JOBS_COMMANDS = (
ActionCommand(
name="check",
help="Checks if job(s) are still alive",
func=lazy_load_command("airflow.cli.commands.jobs_command.check"),
args=(
ARG_JOB_TYPE_FILTER,
ARG_JOB_HOSTNAME_FILTER,
ARG_JOB_HOSTNAME_CALLABLE_FILTER,
ARG_JOB_LIMIT,
ARG_ALLOW_MULTIPLE,
ARG_VERBOSE,
),
epilog=(
"examples:\n"
"To check if the local scheduler is still working properly, run:\n"
"\n"
' $ airflow jobs check --job-type SchedulerJob --local"\n'
"\n"
"To check if any scheduler is running when you are using high availability, run:\n"
"\n"
" $ airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100"
),
),
)
core_commands: list[CLICommand] = [
GroupCommand(
name="dags",
help="Manage DAGs",
subcommands=DAGS_COMMANDS,
),
GroupCommand(
name="kubernetes", help="Tools to help run the KubernetesExecutor", subcommands=KUBERNETES_COMMANDS
),
GroupCommand(
name="tasks",
help="Manage tasks",
subcommands=TASKS_COMMANDS,
),
GroupCommand(
name="pools",
help="Manage pools",
subcommands=POOLS_COMMANDS,
),
GroupCommand(
name="variables",
help="Manage variables",
subcommands=VARIABLES_COMMANDS,
),
GroupCommand(
name="jobs",
help="Manage jobs",
subcommands=JOBS_COMMANDS,
),
GroupCommand(
name="db",
help="Database operations",
subcommands=DB_COMMANDS,
),
ActionCommand(
name="kerberos",
help="Start a kerberos ticket renewer",
func=lazy_load_command("airflow.cli.commands.kerberos_command.kerberos"),
args=(
ARG_PRINCIPAL,
ARG_KEYTAB,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_VERBOSE,
),
),
ActionCommand(
name="webserver",
help="Start a Airflow webserver instance",
func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"),
args=(
ARG_PORT,
ARG_WORKERS,
ARG_WORKERCLASS,
ARG_WORKER_TIMEOUT,
ARG_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_ACCESS_LOGFILE,
ARG_ERROR_LOGFILE,
ARG_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
ActionCommand(
name="scheduler",
help="Start a scheduler instance",
func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
args=(
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_SKIP_SERVE_LOGS,
ARG_VERBOSE,
),
epilog=(
"Signals:\n"
"\n"
" - SIGUSR2: Dump a snapshot of task state being tracked by the executor.\n"
"\n"
" Example:\n"
' pkill -f -USR2 "airflow scheduler"'
),
),
ActionCommand(
name="triggerer",
help="Start a triggerer instance",
func=lazy_load_command("airflow.cli.commands.triggerer_command.triggerer"),
args=(
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_CAPACITY,
ARG_VERBOSE,
ARG_SKIP_SERVE_LOGS,
),
),
ActionCommand(
name="dag-processor",
help="Start a standalone Dag Processor instance",
func=lazy_load_command("airflow.cli.commands.dag_processor_command.dag_processor"),
args=(
ARG_PID,
ARG_DAEMON,
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_VERBOSE,
),
),
ActionCommand(
name="version",
help="Show the version",
func=lazy_load_command("airflow.cli.commands.version_command.version"),
args=(),
),
ActionCommand(
name="cheat-sheet",
help="Display cheat sheet",
func=lazy_load_command("airflow.cli.commands.cheat_sheet_command.cheat_sheet"),
args=(ARG_VERBOSE,),
),
GroupCommand(
name="connections",
help="Manage connections",
subcommands=CONNECTIONS_COMMANDS,
),
GroupCommand(
name="providers",
help="Display providers",
subcommands=PROVIDERS_COMMANDS,
),
GroupCommand(
name="users",
help="Manage users",
subcommands=USERS_COMMANDS,
),
GroupCommand(
name="roles",
help="Manage roles",
subcommands=ROLES_COMMANDS,
),
ActionCommand(
name="sync-perm",
help="Update permissions for existing roles and optionally DAGs",
func=lazy_load_command("airflow.cli.commands.sync_perm_command.sync_perm"),
args=(ARG_INCLUDE_DAGS, ARG_VERBOSE),
),
ActionCommand(
name="rotate-fernet-key",
func=lazy_load_command("airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key"),
help="Rotate encrypted connection credentials and variables",
description=(
"Rotate all encrypted connection credentials and variables; see "
"https://airflow.apache.org/docs/apache-airflow/stable/howto/secure-connections.html"
"#rotating-encryption-keys"
),
args=(),
),
GroupCommand(name="config", help="View configuration", subcommands=CONFIG_COMMANDS),
ActionCommand(
name="info",
help="Show information about current Airflow and environment",
func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
args=(
ARG_ANONYMIZE,
ARG_FILE_IO,
ARG_VERBOSE,
ARG_OUTPUT,
),
),
ActionCommand(
name="plugins",
help="Dump information about loaded plugins",
func=lazy_load_command("airflow.cli.commands.plugins_command.dump_plugins"),
args=(ARG_OUTPUT, ARG_VERBOSE),
),
GroupCommand(
name="celery",
help="Celery components",
description=(
"Start celery components. Works only when using CeleryExecutor. For more information, see "
"https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html"
),
subcommands=CELERY_COMMANDS,
),
ActionCommand(
name="standalone",
help="Run an all-in-one copy of Airflow",
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
args=tuple(),
),
]
if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
),
)
def _remove_dag_id_opt(command: ActionCommand):
cmd = command._asdict()
cmd["args"] = (arg for arg in command.args if arg is not ARG_DAG_ID)
return ActionCommand(**cmd)
dag_cli_commands: list[CLICommand] = [
GroupCommand(
name="dags",
help="Manage DAGs",
subcommands=[
_remove_dag_id_opt(sp)
for sp in DAGS_COMMANDS
if sp.name in ["backfill", "list-runs", "pause", "unpause", "test"]
],
),
GroupCommand(
name="tasks",
help="Manage tasks",
subcommands=[_remove_dag_id_opt(sp) for sp in TASKS_COMMANDS if sp.name in ["list", "test", "run"]],
),
]
DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}