blob: e324053c15b6d35518095b96c25ae49fda8903a9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import ast
import os
from functools import lru_cache
from pathlib import Path
from typing import Any, Iterable, Iterator
import jinja2
import rich_click as click
import yaml
from docutils import nodes
from docutils.nodes import Element
# No stub exists for docutils.parsers.rst.directives. See https://github.com/python/typeshed/issues/5755.
from docutils.parsers.rst import Directive, directives # type: ignore[attr-defined]
from docutils.statemachine import StringList
from provider_yaml_utils import get_provider_yaml_paths, load_package_data
from sphinx.util import nested_parse_with_titles
from sphinx.util.docutils import switch_source_input
CMD_OPERATORS_AND_HOOKS = "operators-and-hooks"
CMD_TRANSFERS = "transfers"
"""
Directives for rendering tables with operators.
To test the template rendering process, you can also run this script as a standalone program.
PYTHONPATH=$PWD/../ python exts/operators_and_hooks_ref.py --help
"""
DEFAULT_HEADER_SEPARATOR = "="
CURRENT_DIR = Path(os.path.dirname(__file__))
TEMPLATE_DIR = CURRENT_DIR / "templates"
ROOT_DIR = os.path.abspath(os.path.join(CURRENT_DIR, os.pardir, os.pardir))
DOCS_DIR = os.path.join(ROOT_DIR, "docs")
@lru_cache(maxsize=None)
def _get_jinja_env():
loader = jinja2.FileSystemLoader(TEMPLATE_DIR, followlinks=True)
env = jinja2.Environment(loader=loader, undefined=jinja2.StrictUndefined)
return env
def _render_template(template_name, **kwargs):
return _get_jinja_env().get_template(template_name).render(**kwargs)
def _docs_path(filepath: str):
if not filepath.startswith("/docs/"):
raise Exception(f"The path must starts with '/docs/'. Current value: {filepath}")
if not filepath.endswith(".rst"):
raise Exception(f"The path must ends with '.rst'. Current value: {filepath}")
if filepath.startswith("/docs/apache-airflow-providers-"):
_, _, provider, rest = filepath.split("/", maxsplit=3)
filepath = f"{provider}:{rest}"
else:
filepath = os.path.join(ROOT_DIR, filepath.lstrip("/"))
filepath = os.path.relpath(filepath, DOCS_DIR)
len_rst = len(".rst")
filepath = filepath[:-len_rst]
return filepath
def _prepare_resource_index(package_data, resource_type):
return {
integration["integration-name"]: {**integration, "package-name": provider["package-name"]}
for provider in package_data
for integration in provider.get(resource_type, [])
}
def _prepare_operators_data(tags: set[str] | None):
package_data = load_package_data()
all_integrations = _prepare_resource_index(package_data, "integrations")
if tags is None:
to_display_integration = all_integrations.values()
else:
to_display_integration = [
integration for integration in all_integrations.values() if tags.intersection(integration["tags"])
]
all_operators_by_integration = _prepare_resource_index(package_data, "operators")
all_hooks_by_integration = _prepare_resource_index(package_data, "hooks")
all_sensors_by_integration = _prepare_resource_index(package_data, "sensors")
results = []
for integration in to_display_integration:
item = {
"integration": integration,
}
operators = all_operators_by_integration.get(integration["integration-name"])
sensors = all_sensors_by_integration.get(integration["integration-name"])
hooks = all_hooks_by_integration.get(integration["integration-name"])
if "how-to-guide" in item["integration"]:
item["integration"]["how-to-guide"] = [_docs_path(d) for d in item["integration"]["how-to-guide"]]
if operators:
item["operators"] = operators
if sensors:
item["sensors"] = sensors
if hooks:
item["hooks"] = hooks
if operators or sensors or hooks:
results.append(item)
return sorted(results, key=lambda d: d["integration"]["integration-name"].lower())
def _render_operator_content(*, tags: set[str] | None, header_separator: str):
tabular_data = _prepare_operators_data(tags)
return _render_template(
"operators_and_hooks_ref.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def _prepare_transfer_data(tags: set[str] | None):
package_data = load_package_data()
all_operators_by_integration = _prepare_resource_index(package_data, "integrations")
# Add edge case
for name in ["SQL", "Local"]:
all_operators_by_integration[name] = {"integration-name": name}
all_transfers = [
{
**transfer,
"package-name": provider["package-name"],
"source-integration": all_operators_by_integration[transfer["source-integration-name"]],
"target-integration": all_operators_by_integration[transfer["target-integration-name"]],
}
for provider in package_data
for transfer in provider.get("transfers", [])
]
if tags is None:
to_display_transfers = all_transfers
else:
to_display_transfers = [
transfer
for transfer in all_transfers
if tags.intersection(transfer["source-integration"].get("tags", set()))
or tags.intersection(transfer["target-integration"].get("tags", set()))
]
for transfer in to_display_transfers:
if "how-to-guide" not in transfer:
continue
transfer["how-to-guide"] = _docs_path(transfer["how-to-guide"])
return to_display_transfers
def _render_transfer_content(*, tags: set[str] | None, header_separator: str):
tabular_data = _prepare_transfer_data(tags)
return _render_template(
"operators_and_hooks_ref-transfers.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def iter_deferrable_operators(module_filename: str) -> Iterator[tuple[str, str]]:
ast_obj = ast.parse(open(module_filename).read())
cls_nodes = (node for node in ast.iter_child_nodes(ast_obj) if isinstance(node, ast.ClassDef))
init_method_nodes = (
(cls_node, node)
for cls_node in cls_nodes
for node in ast.iter_child_nodes(cls_node)
if isinstance(node, ast.FunctionDef) and node.name == "__init__"
)
for cls_node, node in init_method_nodes:
args = node.args
for argument in [*args.args, *args.kwonlyargs]:
if argument.arg == "deferrable":
module_name = module_filename.replace("/", ".")[:-3]
op_name = cls_node.name
yield (module_name, op_name)
def _render_deferrable_operator_content(*, header_separator: str):
providers = []
for provider_yaml_path in get_provider_yaml_paths():
provider_parent_path = Path(provider_yaml_path).parent
provider_info: dict[str, Any] = {"name": "", "operators": []}
for root, _, file_names in os.walk(provider_parent_path):
if all([target not in root for target in ["operators", "sensors"]]):
continue
for file_name in file_names:
if not file_name.endswith(".py") or file_name == "__init__.py":
continue
provider_info["operators"].extend(
iter_deferrable_operators(f"{os.path.relpath(root)}/{file_name}")
)
if provider_info["operators"]:
provider_yaml_content = yaml.safe_load(Path(provider_yaml_path).read_text())
provider_info["name"] = provider_yaml_content["package-name"]
providers.append(provider_info)
return _render_template("deferrable_operators_list.rst.jinja2", providers=providers)
class BaseJinjaReferenceDirective(Directive):
"""The base directive for OperatorsHooksReferenceDirective and TransfersReferenceDirective"""
optional_arguments = 1
option_spec = {"tags": directives.unchanged, "header-separator": directives.unchanged_required}
def run(self):
tags_arg = self.options.get("tags")
tags = {t.strip() for t in tags_arg.split(",")} if tags_arg else None
header_separator = self.options.get("header-separator")
new_content = self.render_content(tags=tags, header_separator=header_separator)
with switch_source_input(self.state, self.content):
new_content = StringList(new_content.splitlines(), source="")
node: Element = nodes.section()
# necessary so that the child nodes get the right source/line set
node.document = self.state.document
nested_parse_with_titles(self.state, new_content, node)
# record all filenames as dependencies -- this will at least
# partially make automatic invalidation possible
for filepath in get_provider_yaml_paths():
self.state.document.settings.record_dependencies.add(filepath)
return node.children
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
"""Return content in RST format"""
raise NotImplementedError("You need to override render_content method.")
def _common_render_list_content(*, header_separator: str, resource_type: str, template: str):
tabular_data = {
provider["package-name"]: {
"name": provider["name"],
resource_type: provider.get(resource_type, []),
}
for provider in load_package_data()
if provider.get(resource_type) is not None
}
return _render_template(template, items=tabular_data, header_separator=header_separator)
class OperatorsHooksReferenceDirective(BaseJinjaReferenceDirective):
"""Generates a list of operators, sensors, hooks"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _render_operator_content(
tags=tags,
header_separator=header_separator,
)
class TransfersReferenceDirective(BaseJinjaReferenceDirective):
"""Generate a list of transfer operators"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _render_transfer_content(
tags=tags,
header_separator=header_separator,
)
class LoggingDirective(BaseJinjaReferenceDirective):
"""Generate list of logging handlers"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator, resource_type="logging", template="logging.rst.jinja2"
)
class AuthBackendDirective(BaseJinjaReferenceDirective):
"""Generate list of auth backend handlers"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator,
resource_type="auth-backends",
template="auth_backend.rst.jinja2",
)
class AuthConfigurations(BaseJinjaReferenceDirective):
"""Generate list of configurations"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
tabular_data = [
provider["package-name"] for provider in load_package_data() if provider.get("config") is not None
]
return _render_template(
"configuration.rst.jinja2", items=tabular_data, header_separator=header_separator
)
class SecretsBackendDirective(BaseJinjaReferenceDirective):
"""Generate list of secret backend handlers"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator,
resource_type="secrets-backends",
template="secret_backend.rst.jinja2",
)
class ConnectionsDirective(BaseJinjaReferenceDirective):
"""Generate list of connections"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator,
resource_type="connection-types",
template="connections.rst.jinja2",
)
class ExtraLinksDirective(BaseJinjaReferenceDirective):
"""Generate list of extra links"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator, resource_type="extra-links", template="extra_links.rst.jinja2"
)
class NotificationsDirective(BaseJinjaReferenceDirective):
"""Generate list of notifiers"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator,
resource_type="notifications",
template="notifications.rst.jinja2",
)
class ExecutorsDirective(BaseJinjaReferenceDirective):
"""Generate list of executors"""
def render_content(
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
) -> str:
return _common_render_list_content(
header_separator=header_separator, resource_type="executors", template="executors.rst.jinja2"
)
class DeferrableOperatorDirective(BaseJinjaReferenceDirective):
"""Generate list of deferrable operators"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_deferrable_operator_content(
header_separator=header_separator,
)
def setup(app):
"""Setup plugin"""
app.add_directive("operators-hooks-ref", OperatorsHooksReferenceDirective)
app.add_directive("transfers-ref", TransfersReferenceDirective)
app.add_directive("airflow-logging", LoggingDirective)
app.add_directive("airflow-auth-backends", AuthBackendDirective)
app.add_directive("airflow-configurations", AuthConfigurations)
app.add_directive("airflow-secrets-backends", SecretsBackendDirective)
app.add_directive("airflow-connections", ConnectionsDirective)
app.add_directive("airflow-extra-links", ExtraLinksDirective)
app.add_directive("airflow-notifications", NotificationsDirective)
app.add_directive("airflow-executors", ExecutorsDirective)
app.add_directive("airflow-deferrable-operators", DeferrableOperatorDirective)
return {"parallel_read_safe": True, "parallel_write_safe": True}
option_tag = click.option(
"--tag",
multiple=True,
help="If passed, displays integrations that have a matching tag",
)
option_header_separator = click.option(
"--header-separator", default=DEFAULT_HEADER_SEPARATOR, show_default=True
)
@click.group(context_settings={"help_option_names": ["-h", "--help"], "max_content_width": 500})
def cli():
"""Render tables with integrations"""
@cli.command()
@option_tag
@option_header_separator
def operators_and_hooks(tag: Iterable[str], header_separator: str):
"""Renders Operators ahd Hooks content"""
print(_render_operator_content(tags=set(tag) if tag else None, header_separator=header_separator))
@cli.command()
@option_tag
@option_header_separator
def transfers(tag: Iterable[str], header_separator: str):
"""Renders Transfers content"""
print(_render_transfer_content(tags=set(tag) if tag else None, header_separator=header_separator))
@cli.command()
@option_header_separator
def logging(header_separator: str):
"""Renders Logger content"""
print(
_common_render_list_content(
header_separator=header_separator, resource_type="logging", template="logging.rst.jinja2"
)
)
@cli.command()
@option_header_separator
def auth_backends(header_separator: str):
"""Renders Logger content"""
print(
_common_render_list_content(
header_separator=header_separator,
resource_type="auth-backends",
template="auth_backend.rst.jinja2",
)
)
@cli.command()
@option_header_separator
def secret_backends(header_separator: str):
"""Renders Secret Backends content"""
print(
_common_render_list_content(
header_separator=header_separator,
resource_type="secrets-backends",
template="secret_backend.rst.jinja2",
)
)
@cli.command()
@option_header_separator
def connections(header_separator: str):
"""Renders Connections content"""
print(
_common_render_list_content(
header_separator=header_separator,
resource_type="connection-types",
template="connections.rst.jinja2",
)
)
@cli.command()
@option_header_separator
def extra_links(header_separator: str):
"""Renders Extra links content"""
print(
_common_render_list_content(
header_separator=header_separator, resource_type="extra-links", template="extra_links.rst.jinja2"
)
)
@cli.command()
@option_tag
@option_header_separator
def deferrable_operators(tag: Iterable[str], header_separator: str):
"""Renders Deferrable Operators content"""
print(_render_deferrable_operator_content(header_separator=header_separator))
if __name__ == "__main__":
cli()