blob: 59b889564cb85f64e1771cfb0fd18fb7d64b7fb9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Providers sub-commands."""
from __future__ import annotations
import sys
import re2
from airflow.cli.simple_table import AirflowConsole
from airflow.providers_manager import ProvidersManager
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
ERROR_IMPORTING_HOOK = "Error when importing hook!"
def _remove_rst_syntax(value: str) -> str:
return re2.sub("[`_<>]", "", value.strip(" \n."))
@suppress_logs_and_warning
@providers_configuration_loaded
def provider_get(args):
"""Get a provider info."""
providers = ProvidersManager().providers
if args.provider_name in providers:
provider_version = providers[args.provider_name].version
provider_info = providers[args.provider_name].data
if args.full:
provider_info["description"] = _remove_rst_syntax(provider_info["description"])
AirflowConsole().print_as(
data=[provider_info],
output=args.output,
)
else:
AirflowConsole().print_as(
data=[{"Provider": args.provider_name, "Version": provider_version}], output=args.output
)
else:
raise SystemExit(f"No such provider installed: {args.provider_name}")
@suppress_logs_and_warning
@providers_configuration_loaded
def providers_list(args):
"""List all providers at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().providers.values()),
output=args.output,
mapper=lambda x: {
"package_name": x.data["package-name"],
"description": _remove_rst_syntax(x.data["description"]),
"version": x.version,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def hooks_list(args):
"""List all hooks at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().hooks.items()),
output=args.output,
mapper=lambda x: {
"connection_type": x[0],
"class": x[1].hook_class_name if x[1] else ERROR_IMPORTING_HOOK,
"conn_id_attribute_name": x[1].connection_id_attribute_name if x[1] else ERROR_IMPORTING_HOOK,
"package_name": x[1].package_name if x[1] else ERROR_IMPORTING_HOOK,
"hook_name": x[1].hook_name if x[1] else ERROR_IMPORTING_HOOK,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def triggers_list(args):
AirflowConsole().print_as(
data=ProvidersManager().trigger,
output=args.output,
mapper=lambda x: {
"package_name": x.package_name,
"class": x.trigger_class_name,
"integration_name": x.integration_name,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def notifications_list(args):
AirflowConsole().print_as(
data=ProvidersManager().notification,
output=args.output,
mapper=lambda x: {
"notification_class_name": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def connection_form_widget_list(args):
"""List all custom connection form fields at the command line."""
AirflowConsole().print_as(
data=sorted(ProvidersManager().connection_form_widgets.items()),
output=args.output,
mapper=lambda x: {
"connection_parameter_name": x[0],
"class": x[1].hook_class_name,
"package_name": x[1].package_name,
"field_type": x[1].field.field_class.__name__,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def connection_field_behaviours(args):
"""List field behaviours."""
AirflowConsole().print_as(
data=list(ProvidersManager().field_behaviours),
output=args.output,
mapper=lambda x: {
"field_behaviours": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def extra_links_list(args):
"""List all extra links at the command line."""
AirflowConsole().print_as(
data=ProvidersManager().extra_links_class_names,
output=args.output,
mapper=lambda x: {
"extra_link_class_name": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def logging_list(args):
"""List all log task handlers at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().logging_class_names),
output=args.output,
mapper=lambda x: {
"logging_class_name": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def secrets_backends_list(args):
"""List all secrets backends at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().secrets_backend_class_names),
output=args.output,
mapper=lambda x: {
"secrets_backend_class_name": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def auth_backend_list(args):
"""List all API auth backend modules at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().auth_backend_module_names),
output=args.output,
mapper=lambda x: {
"api_auth_backend_module": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def auth_managers_list(args):
"""List all auth managers at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().auth_managers),
output=args.output,
mapper=lambda x: {
"auth_managers_module": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def executors_list(args):
"""List all executors at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().executor_class_names),
output=args.output,
mapper=lambda x: {
"executor_class_names": x,
},
)
@suppress_logs_and_warning
@providers_configuration_loaded
def config_list(args):
"""List all configurations at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().provider_configs),
output=args.output,
mapper=lambda x: {
"provider_config": x,
},
)
@suppress_logs_and_warning
def lazy_loaded(args):
"""
Informs if providers manager has been initialized too early.
If provider is initialized, shows the stack trace and exit with error code 1.
"""
import rich
if ProvidersManager.initialized():
rich.print(
"\n[red]ProvidersManager was initialized during CLI parsing. This should not happen.\n",
file=sys.stderr,
)
rich.print(
"\n[yellow]Please make sure no Providers Manager initialization happens during CLI parsing.\n",
file=sys.stderr,
)
rich.print("Stack trace where it has been initialized:\n", file=sys.stderr)
rich.print(ProvidersManager.initialization_stack_trace(), file=sys.stderr)
sys.exit(1)
else:
rich.print("[green]All ok. Providers Manager was not initialized during the CLI parsing.")
sys.exit(0)