blob: 1672f7577e618d9149b56b289c57102647308612 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Command-line interface"""
import argparse
import os
import textwrap
from argparse import RawTextHelpFormatter
from itertools import filterfalse, tee
from typing import Callable
from tabulate import tabulate_formats
from airflow import api, settings
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.cli import alternative_conn_specs
from airflow.utils.module_loading import import_string
from airflow.utils.timezone import parse as parsedate
api.load_auth()
DAGS_FOLDER = settings.DAGS_FOLDER
BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
if BUILD_DOCS:
DAGS_FOLDER = '[AIRFLOW_HOME]/dags'
def lazy_load_command(import_path: str) -> Callable:
"""Create a lazy loader for command"""
_, _, name = import_path.rpartition('.')
def command(*args, **kwargs):
func = import_string(import_path)
return func(*args, **kwargs)
command.__name__ = name # type: ignore
return command
class Arg:
"""Class to keep information about command line argument"""
# pylint: disable=redefined-builtin
def __init__(self, flags=None, help=None, action=None, default=None, nargs=None,
type=None, choices=None, required=None, metavar=None):
self.flags = flags
self.help = help
self.action = action
self.default = default
self.nargs = nargs
self.type = type
self.choices = choices
self.required = required
self.metavar = metavar
# pylint: enable=redefined-builtin
class CLIFactory:
"""
Factory class which generates command line argument parser and holds information
about all available Airflow commands
"""
args = {
# Shared
'dag_id': Arg(("dag_id",), "The id of the dag"),
'task_id': Arg(("task_id",), "The id of the task"),
'execution_date': Arg(
("execution_date",), help="The execution date of the DAG",
type=parsedate),
'task_regex': Arg(
("-t", "--task_regex"),
"The regex to filter specific task_ids to backfill (optional)"),
'subdir': Arg(
("-sd", "--subdir"),
"File location or directory from which to look for the dag. "
"Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
"value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' ",
default=DAGS_FOLDER),
'start_date': Arg(
("-s", "--start_date"), "Override start_date YYYY-MM-DD",
type=parsedate),
'end_date': Arg(
("-e", "--end_date"), "Override end_date YYYY-MM-DD",
type=parsedate),
'dry_run': Arg(
("-dr", "--dry_run"), "Perform a dry run", "store_true"),
'pid': Arg(
("--pid",), "PID file location",
nargs='?'),
'daemon': Arg(
("-D", "--daemon"), "Daemonize instead of running "
"in the foreground",
"store_true"),
'stderr': Arg(
("--stderr",), "Redirect stderr to this file"),
'stdout': Arg(
("--stdout",), "Redirect stdout to this file"),
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),
'yes': Arg(
("-y", "--yes"),
"Do not prompt to confirm reset. Use with care!",
"store_true",
default=False),
'output': Arg(
("--output",), (
"Output table format. The specified value is passed to "
"the tabulate module (https://pypi.org/project/tabulate/). "
"Valid values are: ({})".format("|".join(tabulate_formats))
),
choices=tabulate_formats,
default="fancy_grid"),
# list_dag_runs
'no_backfill': Arg(
("--no_backfill",),
"filter all the backfill dagruns given the dag id", "store_true"),
'state': Arg(
("--state",),
"Only list the dag runs corresponding to the state"
),
# list_jobs
'limit': Arg(
("--limit",),
"Return a limited number of records"
),
# backfill
'mark_success': Arg(
("-m", "--mark_success"),
"Mark jobs as succeeded without running them", "store_true"),
'verbose': Arg(
("-v", "--verbose"),
"Make logging output more verbose", "store_true"),
'local': Arg(
("-l", "--local"),
"Run the task using the LocalExecutor", "store_true"),
'donot_pickle': Arg(
("-x", "--donot_pickle"), (
"Do not attempt to pickle the DAG object to send over "
"to the workers, just tell the workers to run their version "
"of the code"),
"store_true"),
'bf_ignore_dependencies': Arg(
("-i", "--ignore_dependencies"),
(
"Skip upstream tasks, run only the tasks "
"matching the regexp. Only works in conjunction "
"with task_regex"),
"store_true"),
'bf_ignore_first_depends_on_past': Arg(
("-I", "--ignore_first_depends_on_past"),
(
"Ignores depends_on_past dependencies for the first "
"set of tasks only (subsequent executions in the backfill "
"DO respect depends_on_past)"),
"store_true"),
'pool': Arg(("--pool",), "Resource pool to use"),
'delay_on_limit': Arg(
("--delay_on_limit",),
help=("Amount of time in seconds to wait when the limit "
"on maximum active dag runs (max_active_runs) has "
"been reached before trying to execute a dag run "
"again"),
type=float,
default=1.0),
'reset_dag_run': Arg(
("--reset_dagruns",),
(
"if set, the backfill will delete existing "
"backfill-related DAG runs and start "
"anew with fresh, running DAG runs"),
"store_true"),
'rerun_failed_tasks': Arg(
("--rerun_failed_tasks",),
(
"if set, the backfill will auto-rerun "
"all the failed tasks for the backfill date range "
"instead of throwing exceptions"),
"store_true"),
'run_backwards': Arg(
("-B", "--run_backwards",),
(
"if set, the backfill will run tasks from the most "
"recent day first. if there are tasks that depend_on_past "
"this option will throw an exception"),
"store_true"),
# list_tasks
'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
# list_dags
'report': Arg(
("-r", "--report"), "Show DagBag loading report", "store_true"),
# clear
'upstream': Arg(
("-u", "--upstream"), "Include upstream tasks", "store_true"),
'only_failed': Arg(
("-f", "--only_failed"), "Only failed jobs", "store_true"),
'only_running': Arg(
("-r", "--only_running"), "Only running jobs", "store_true"),
'downstream': Arg(
("-d", "--downstream"), "Include downstream tasks", "store_true"),
'exclude_subdags': Arg(
("-x", "--exclude_subdags"),
"Exclude subdags", "store_true"),
'exclude_parentdag': Arg(
("-xp", "--exclude_parentdag"),
"Exclude ParentDAGS if the task cleared is a part of a SubDAG",
"store_true"),
'dag_regex': Arg(
("-dx", "--dag_regex"),
"Search dag_id as regex instead of exact string", "store_true"),
# show_dag
'save': Arg(
("-s", "--save"),
"Saves the result to the indicated file.\n"
"\n"
"The file format is determined by the file extension. For more information about supported "
"format, see: https://www.graphviz.org/doc/info/output.html\n"
"\n"
"If you want to create a PNG file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.png\n"
"\n"
"If you want to create a DOT file then you should execute the following command:\n"
"airflow dags show <DAG_ID> --save output.dot\n"
),
'imgcat': Arg(
("--imgcat", ),
"Displays graph using the imgcat tool. \n"
"\n"
"For more information, see: https://www.iterm2.com/documentation-images.html",
action='store_true'),
# trigger_dag
'run_id': Arg(("-r", "--run_id"), "Helps to identify this run"),
'conf': Arg(
('-c', '--conf'),
"JSON string that gets pickled into the DagRun's conf attribute"),
'exec_date': Arg(
("-e", "--exec_date"), help="The execution date of the DAG",
type=parsedate),
# pool
'pool_name': Arg(
("pool",),
metavar='NAME',
help="Pool name"),
'pool_slots': Arg(
("slots",),
type=int,
help="Pool slots"),
'pool_description': Arg(
("description",),
help="Pool description"),
'pool_import': Arg(
("file",),
metavar="FILEPATH",
help="Import pools from JSON file"),
'pool_export': Arg(
("file",),
metavar="FILEPATH",
help="Export all pools to JSON file"),
# variables
'var': Arg(
("key",),
help="Variable key"),
'var_value': Arg(
("value",),
metavar='VALUE',
help="Variable value"),
'default': Arg(
("-d", "--default"),
metavar="VAL",
default=None,
help="Default value returned if variable does not exist"),
'json': Arg(
("-j", "--json"),
help="Deserialize JSON variable",
action="store_true"),
'var_import': Arg(
("file",),
help="Import variables from JSON file"),
'var_export': Arg(
("file",),
help="Export all variables to JSON file"),
# kerberos
'principal': Arg(
("principal",), "kerberos principal", nargs='?'),
'keytab': Arg(
("-kt", "--keytab"), "keytab",
nargs='?', default=conf.get('kerberos', 'keytab')),
# run
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
# all dependencies (not just past success), e.g. the ignore_depends_on_past
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
# the "ignore_all_dependencies" command should be called the"force" command
# instead.
'interactive': Arg(
('-int', '--interactive'),
help='Do not capture standard output and error streams '
'(useful for interactive debugging)',
action='store_true'),
'force': Arg(
("-f", "--force"),
"Ignore previous task instance state, rerun regardless if task already "
"succeeded/failed",
"store_true"),
'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"),
'ignore_all_dependencies': Arg(
("-A", "--ignore_all_dependencies"),
"Ignores all non-critical dependencies, including ignore_ti_state and "
"ignore_task_deps",
"store_true"),
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
# vague (e.g. a task being in the appropriate state to be run is also a dependency
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
# slightly better (as it ignores all dependencies that are specific to the task),
# so deprecate the old command name and use this instead.
'ignore_dependencies': Arg(
("-i", "--ignore_dependencies"),
"Ignore task-specific dependencies, e.g. upstream, depends_on_past, and "
"retry delay dependencies",
"store_true"),
'ignore_depends_on_past': Arg(
("-I", "--ignore_depends_on_past"),
"Ignore depends_on_past dependencies (but respect "
"upstream dependencies)",
"store_true"),
'ship_dag': Arg(
("--ship_dag",),
"Pickles (serializes) the DAG and ships it to the worker",
"store_true"),
'pickle': Arg(
("-p", "--pickle"),
"Serialized pickle object of the entire dag (used internally)"),
'job_id': Arg(("-j", "--job_id"), argparse.SUPPRESS),
'cfg_path': Arg(
("--cfg_path",), "Path to config file to use instead of airflow.cfg"),
# webserver
'port': Arg(
("-p", "--port"),
default=conf.get('webserver', 'WEB_SERVER_PORT'),
type=int,
help="The port on which to run the server"),
'ssl_cert': Arg(
("--ssl_cert",),
default=conf.get('webserver', 'WEB_SERVER_SSL_CERT'),
help="Path to the SSL certificate for the webserver"),
'ssl_key': Arg(
("--ssl_key",),
default=conf.get('webserver', 'WEB_SERVER_SSL_KEY'),
help="Path to the key to use with the SSL certificate"),
'workers': Arg(
("-w", "--workers"),
default=conf.get('webserver', 'WORKERS'),
type=int,
help="Number of workers to run the webserver on"),
'workerclass': Arg(
("-k", "--workerclass"),
default=conf.get('webserver', 'WORKER_CLASS'),
choices=['sync', 'eventlet', 'gevent', 'tornado'],
help="The worker class to use for Gunicorn"),
'worker_timeout': Arg(
("-t", "--worker_timeout"),
default=conf.get('webserver', 'WEB_SERVER_WORKER_TIMEOUT'),
type=int,
help="The timeout for waiting on webserver workers"),
'hostname': Arg(
("-hn", "--hostname"),
default=conf.get('webserver', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server"),
'debug': Arg(
("-d", "--debug"),
"Use the server that ships with Flask in debug mode",
"store_true"),
'access_logfile': Arg(
("-A", "--access_logfile"),
default=conf.get('webserver', 'ACCESS_LOGFILE'),
help="The logfile to store the webserver access log. Use '-' to print to "
"stderr"),
'error_logfile': Arg(
("-E", "--error_logfile"),
default=conf.get('webserver', 'ERROR_LOGFILE'),
help="The logfile to store the webserver error log. Use '-' to print to "
"stderr"),
# scheduler
'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
'num_runs': Arg(
("-n", "--num_runs"),
default=conf.getint('scheduler', 'num_runs'), type=int,
help="Set the number of runs to execute before exiting"),
# worker
'do_pickle': Arg(
("-p", "--do_pickle"),
default=False,
help=(
"Attempt to pickle the DAG object to send over "
"to the workers, instead of letting workers run their version "
"of the code"),
action="store_true"),
'queues': Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get('celery', 'DEFAULT_QUEUE')),
'concurrency': Arg(
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.get('celery', 'worker_concurrency')),
'celery_hostname': Arg(
("-cn", "--celery_hostname"),
help=("Set the hostname of celery worker "
"if you have multiple workers on a single machine")),
# flower
'broker_api': Arg(("-a", "--broker_api"), help="Broker api"),
'flower_hostname': Arg(
("-hn", "--hostname"),
default=conf.get('celery', 'FLOWER_HOST'),
help="Set the hostname on which to run the server"),
'flower_port': Arg(
("-p", "--port"),
default=conf.get('celery', 'FLOWER_PORT'),
type=int,
help="The port on which to run the server"),
'flower_conf': Arg(
("-fc", "--flower_conf"),
help="Configuration file for flower"),
'flower_url_prefix': Arg(
("-u", "--url_prefix"),
default=conf.get('celery', 'FLOWER_URL_PREFIX'),
help="URL prefix for Flower"),
'flower_basic_auth': Arg(
("-ba", "--basic_auth"),
default=conf.get('celery', 'FLOWER_BASIC_AUTH'),
help=("Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
"Example: flower_basic_auth = user1:password1,user2:password2")),
'task_params': Arg(
("-tp", "--task_params"),
help="Sends a JSON params dict to the task"),
'post_mortem': Arg(
("-pm", "--post_mortem"),
action="store_true",
help="Open debugger on uncaught exception",
),
# connections
'conn_id': Arg(
('conn_id',),
help='Connection id, required to add/delete a connection',
type=str),
'conn_uri': Arg(
('--conn_uri',),
help='Connection URI, required to add a connection without conn_type',
type=str),
'conn_type': Arg(
('--conn_type',),
help='Connection type, required to add a connection without conn_uri',
type=str),
'conn_host': Arg(
('--conn_host',),
help='Connection host, optional when adding a connection',
type=str),
'conn_login': Arg(
('--conn_login',),
help='Connection login, optional when adding a connection',
type=str),
'conn_password': Arg(
('--conn_password',),
help='Connection password, optional when adding a connection',
type=str),
'conn_schema': Arg(
('--conn_schema',),
help='Connection schema, optional when adding a connection',
type=str),
'conn_port': Arg(
('--conn_port',),
help='Connection port, optional when adding a connection',
type=str),
'conn_extra': Arg(
('--conn_extra',),
help='Connection `Extra` field, optional when adding a connection',
type=str),
# users
'username': Arg(
('--username',),
help='Username of the user',
required=True,
type=str),
'username_optional': Arg(
('--username',),
help='Username of the user',
type=str),
'firstname': Arg(
('--firstname',),
help='First name of the user',
required=True,
type=str),
'lastname': Arg(
('--lastname',),
help='Last name of the user',
required=True,
type=str),
'role': Arg(
('--role',),
help='Role of the user. Existing roles include Admin, '
'User, Op, Viewer, and Public',
required=True,
type=str,
),
'email': Arg(
('--email',),
help='Email of the user',
required=True,
type=str),
'email_optional': Arg(
('--email',),
help='Email of the user',
type=str),
'password': Arg(
('--password',),
help='Password of the user, required to create a user '
'without --use_random_password',
type=str),
'use_random_password': Arg(
('--use_random_password',),
help='Do not prompt for password. Use random string instead.'
' Required to create a user without --password ',
default=False,
action='store_true'),
'user_import': Arg(
("import",),
metavar="FILEPATH",
help="Import users from JSON file. Example format::\n" +
textwrap.indent(textwrap.dedent('''
[
{
"email": "foo@bar.org",
"firstname": "Jon",
"lastname": "Doe",
"roles": ["Public"],
"username": "jondoe"
}
]'''), " " * 4),
),
'user_export': Arg(
("export",),
metavar="FILEPATH",
help="Export all users to JSON file"),
# roles
'create_role': Arg(
('-c', '--create'),
help='Create a new role',
action='store_true'),
'list_roles': Arg(
('-l', '--list'),
help='List roles',
action='store_true'),
'roles': Arg(
('role',),
help='The name of a role',
nargs='*'),
'autoscale': Arg(
('-a', '--autoscale'),
help="Minimum and Maximum number of worker to autoscale"),
'skip_serve_logs': Arg(
("-s", "--skip_serve_logs"),
default=False,
help="Don't start the serve logs process along with the workers",
action="store_true"),
}
DAGS_SUBCOMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_list_dags'),
'name': 'list',
'help': "List all the DAGs",
'args': ('subdir', 'report'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_list_dag_runs'),
'name': 'list_runs',
'help': "List dag runs given a DAG id. If state option is given, it will only "
"search for all the dagruns with the given state. "
"If no_backfill option is given, it will filter out "
"all backfill dagruns for given dag id",
'args': ('dag_id', 'no_backfill', 'state', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_list_jobs'),
'name': 'list_jobs',
'help': "List the jobs",
'args': ('dag_id_opt', 'state', 'limit', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_state'),
'name': 'state',
'help': "Get the status of a dag run",
'args': ('dag_id', 'execution_date', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'),
'name': 'next_execution',
'help': "Get the next execution datetime of a DAG",
'args': ('dag_id', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_pause'),
'name': 'pause',
'help': 'Pause a DAG',
'args': ('dag_id', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_unpause'),
'name': 'unpause',
'help': 'Resume a paused DAG',
'args': ('dag_id', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_trigger'),
'name': 'trigger',
'help': 'Trigger a DAG run',
'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_delete'),
'name': 'delete',
'help': "Delete all DB records related to the specified DAG",
'args': ('dag_id', 'yes'),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_show'),
'name': 'show',
'help': "Displays DAG's tasks with their dependencies",
'args': ('dag_id', 'subdir', 'save', 'imgcat',),
},
{
'func': lazy_load_command('airflow.cli.commands.dag_command.dag_backfill'),
'name': 'backfill',
'help': "Run subsections of a DAG for a specified date range. "
"If reset_dag_run option is used,"
" backfill will first prompt users whether airflow "
"should clear all the previous dag_run and task_instances "
"within the backfill date range. "
"If rerun_failed_tasks is used, backfill "
"will auto re-run the previous failed task instances"
" within the backfill date range",
'args': (
'dag_id', 'task_regex', 'start_date', 'end_date',
'mark_success', 'local', 'donot_pickle', 'yes',
'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf',
'reset_dag_run', 'rerun_failed_tasks', 'run_backwards'
),
},
)
TASKS_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_list'),
'name': 'list',
'help': "List the tasks within a DAG",
'args': ('dag_id', 'tree', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_clear'),
'name': 'clear',
'help': "Clear a set of task instance, as if they never ran",
'args': (
'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
'upstream', 'downstream', 'yes', 'only_failed',
'only_running', 'exclude_subdags', 'exclude_parentdag', 'dag_regex'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_state'),
'name': 'state',
'help': "Get the status of a task instance",
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_failed_deps'),
'name': 'failed_deps',
'help': (
"Returns the unmet dependencies for a task instance from the perspective "
"of the scheduler. In other words, why a task instance doesn't get "
"scheduled and then queued by the scheduler, and then run by an "
"executor)"),
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_render'),
'name': 'render',
'help': "Render a task instance's template(s)",
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_run'),
'name': 'run',
'help': "Run a single task instance",
'args': (
'dag_id', 'task_id', 'execution_date', 'subdir',
'mark_success', 'force', 'pool', 'cfg_path',
'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies',
'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id', 'interactive',),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_test'),
'name': 'test',
'help': (
"Test a task instance. This will run a task without checking for "
"dependencies or recording its state in the database"),
'args': (
'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
'task_params', 'post_mortem'),
},
{
'func': lazy_load_command('airflow.cli.commands.task_command.task_states_for_dag_run'),
'name': 'states_for_dag_run',
'help': "Get the status of all task instances in a dag run",
'args': (
'dag_id', 'execution_date', 'output'),
},
)
POOLS_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_list'),
'name': 'list',
'help': 'List pools',
'args': ('output',),
},
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_get'),
'name': 'get',
'help': 'Get pool size',
'args': ('pool_name', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_set'),
'name': 'set',
'help': 'Configure pool',
'args': ('pool_name', 'pool_slots', 'pool_description', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_delete'),
'name': 'delete',
'help': 'Delete pool',
'args': ('pool_name', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_import'),
'name': 'import',
'help': 'Import pools',
'args': ('pool_import', 'output',),
},
{
'func': lazy_load_command('airflow.cli.commands.pool_command.pool_export'),
'name': 'export',
'help': 'Export all pools',
'args': ('pool_export', 'output',),
},
)
VARIABLES_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_list'),
'name': 'list',
'help': 'List variables',
'args': (),
},
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_get'),
'name': 'get',
'help': 'Get variable',
'args': ('var', 'json', 'default'),
},
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_set'),
'name': 'set',
'help': 'Set variable',
'args': ('var', 'var_value', 'json'),
},
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_delete'),
'name': 'delete',
'help': 'Delete variable',
'args': ('var',),
},
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_import'),
'name': 'import',
'help': 'Import variables',
'args': ('var_import',),
},
{
'func': lazy_load_command('airflow.cli.commands.variable_command.variables_export'),
'name': 'export',
'help': 'Export all variables',
'args': ('var_export',),
},
)
DB_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.db_command.initdb'),
'name': 'init',
'help': "Initialize the metadata database",
'args': (),
},
{
'func': lazy_load_command('airflow.cli.commands.db_command.resetdb'),
'name': 'reset',
'help': "Burn down and rebuild the metadata database",
'args': ('yes',),
},
{
'func': lazy_load_command('airflow.cli.commands.db_command.upgradedb'),
'name': 'upgrade',
'help': "Upgrade the metadata database to latest version",
'args': tuple(),
},
{
'func': lazy_load_command('airflow.cli.commands.db_command.shell'),
'name': 'shell',
'help': "Runs a shell to access the database",
'args': tuple(),
},
{
'func': lazy_load_command('airflow.cli.commands.db_command.check'),
'name': 'check',
'help': "Check if the database can be reached.",
'args': tuple(),
},
)
CONNECTIONS_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.connection_command.connections_list'),
'name': 'list',
'help': 'List connections',
'args': ('output',),
},
{
'func': lazy_load_command('airflow.cli.commands.connection_command.connections_add'),
'name': 'add',
'help': 'Add a connection',
'args': ('conn_id', 'conn_uri', 'conn_extra') + tuple(alternative_conn_specs),
},
{
'func': lazy_load_command('airflow.cli.commands.connection_command.connections_delete'),
'name': 'delete',
'help': 'Delete a connection',
'args': ('conn_id',),
},
)
USERS_COMMANDSS = (
{
'func': lazy_load_command('airflow.cli.commands.user_command.users_list'),
'name': 'list',
'help': 'List users',
'args': ('output',),
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.users_create'),
'name': 'create',
'help': 'Create a user',
'args': ('role', 'username', 'email', 'firstname', 'lastname', 'password',
'use_random_password')
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.users_delete'),
'name': 'delete',
'help': 'Delete a user',
'args': ('username',),
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.add_role'),
'name': 'add_role',
'help': 'Add role to a user',
'args': ('username_optional', 'email_optional', 'role'),
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.remove_role'),
'name': 'remove_role',
'help': 'Remove role from a user',
'args': ('username_optional', 'email_optional', 'role'),
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.users_import'),
'name': 'import',
'help': 'Import users',
'args': ('user_import',),
},
{
'func': lazy_load_command('airflow.cli.commands.user_command.users_export'),
'name': 'export',
'help': 'Export all users',
'args': ('user_export',),
},
)
ROLES_COMMANDS = (
{
'func': lazy_load_command('airflow.cli.commands.role_command.roles_list'),
'name': 'list',
'help': 'List roles',
'args': ('output',),
},
{
'func': lazy_load_command('airflow.cli.commands.role_command.roles_create'),
'name': 'create',
'help': 'Create role',
'args': ('roles',),
},
)
subparsers = [
{
'help': 'List and manage DAGs',
'name': 'dags',
'subcommands': DAGS_SUBCOMMANDS,
}, {
'help': 'List and manage tasks',
'name': 'tasks',
'subcommands': TASKS_COMMANDS,
}, {
'help': "CRUD operations on pools",
'name': 'pools',
'subcommands': POOLS_COMMANDS,
}, {
'help': "CRUD operations on variables",
'name': 'variables',
'subcommands': VARIABLES_COMMANDS,
"args": ('set', 'get', 'json', 'default',
'var_import', 'var_export', 'var_delete'),
}, {
'help': "Database operations",
'name': 'db',
'subcommands': DB_COMMANDS,
}, {
'name': 'kerberos',
'func': lazy_load_command('airflow.cli.commands.kerberos_command.kerberos'),
'help': "Start a kerberos ticket renewer",
'args': ('principal', 'keytab', 'pid',
'daemon', 'stdout', 'stderr', 'log_file'),
}, {
'name': 'webserver',
'func': lazy_load_command('airflow.cli.commands.webserver_command.webserver'),
'help': "Start a Airflow webserver instance",
'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
}, {
'name': 'scheduler',
'func': lazy_load_command('airflow.cli.commands.scheduler_command.scheduler'),
'help': "Start a scheduler instance",
'args': ('dag_id_opt', 'subdir', 'num_runs',
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
}, {
'name': 'version',
'func': lazy_load_command('airflow.cli.commands.version_command.version'),
'help': "Show the version",
'args': tuple(),
}, {
'help': "List/Add/Delete connections",
'name': 'connections',
'subcommands': CONNECTIONS_COMMANDS,
}, {
'help': "CRUD operations on users",
'name': 'users',
'subcommands': USERS_COMMANDSS,
}, {
'help': 'Create/List roles',
'name': 'roles',
'subcommands': ROLES_COMMANDS,
}, {
'name': 'sync_perm',
'func': lazy_load_command('airflow.cli.commands.sync_perm_command.sync_perm'),
'help': "Update permissions for existing roles and DAGs",
'args': tuple(),
},
{
'name': 'rotate_fernet_key',
'func': lazy_load_command('airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key'),
'help': 'Rotate all encrypted connection credentials and variables; see '
'https://airflow.readthedocs.io/en/stable/howto/secure-connections.html'
'#rotating-encryption-keys',
'args': (),
},
{
'name': 'config',
'func': lazy_load_command('airflow.cli.commands.config_command.show_config'),
'help': 'Show current application configuration',
'args': (),
},
]
if conf.get("core", "EXECUTOR") == ExecutorLoader.CELERY_EXECUTOR or BUILD_DOCS:
subparsers.append({
"help": "Start celery components",
"name": "celery",
"subcommands": (
{
'name': 'worker',
'func': lazy_load_command('airflow.cli.commands.celery_command.worker'),
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale', 'skip_serve_logs'),
}, {
'name': 'flower',
'func': lazy_load_command('airflow.cli.commands.celery_command.flower'),
'help': "Start a Celery Flower",
'args': (
'flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix',
'flower_basic_auth', 'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
},
{
'name': 'stop',
'func': lazy_load_command('airflow.cli.commands.celery_command.stop_worker'),
'help': "Stop the Celery worker gracefully",
'args': (),
}
)
})
subparsers_dict = {sp.get('name') or sp['func'].__name__: sp for sp in subparsers} # type: ignore
dag_subparsers = (
'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause', 'list_dag_runs')
@classmethod
def get_parser(cls, dag_parser=False):
"""Creates and returns command line argument parser"""
class DefaultHelpParser(argparse.ArgumentParser):
"""Override argparse.ArgumentParser.error and use print_help instead of print_usage"""
def error(self, message):
self.print_help()
self.exit(2, '\n{} command error: {}, see help above.\n'.format(self.prog, message))
parser = DefaultHelpParser()
subparsers = parser.add_subparsers(
help='sub-command help', dest='subcommand')
subparsers.required = True
subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys()
for sub in sorted(subparser_list):
sub = cls.subparsers_dict[sub]
cls._add_subcommand(subparsers, sub)
return parser
@classmethod
def sort_args(cls, args: Arg):
"""
Sort subcommand optional args, keep positional args
"""
def partition(pred, iterable):
"""
Use a predicate to partition entries into false entries and true entries
"""
iter_1, iter_2 = tee(iterable)
return filterfalse(pred, iter_1), filter(pred, iter_2)
def get_long_option(arg):
"""
Get long option from Arg.flags
"""
return cls.args[arg].flags[0] if len(cls.args[arg].flags) == 1 else cls.args[arg].flags[1]
positional, optional = partition(lambda x: cls.args[x].flags[0].startswith("-"), args)
yield from positional
yield from sorted(optional, key=lambda x: get_long_option(x).lower())
@classmethod
def _add_subcommand(cls, subparsers, sub):
dag_parser = False
sub_proc = subparsers.add_parser(
sub.get('name') or sub['func'].__name__, help=sub['help'] # type: ignore
)
sub_proc.formatter_class = RawTextHelpFormatter
subcommands = sub.get('subcommands', [])
if subcommands:
sub_subparsers = sub_proc.add_subparsers(dest='subcommand')
sub_subparsers.required = True
for command in sorted(subcommands, key=lambda x: x['name']):
cls._add_subcommand(sub_subparsers, command)
else:
for arg in cls.sort_args(sub['args']):
if 'dag_id' in arg and dag_parser:
continue
arg = cls.args[arg]
kwargs = {
f: v
for f, v in vars(arg).items() if f != 'flags' and v}
sub_proc.add_argument(*arg.flags, **kwargs)
sub_proc.set_defaults(func=sub['func'])
def get_parser():
"""Calls static method inside factory which creates argument parser"""
return CLIFactory.get_parser()