blob: 82ffc41783126d4256dbfff2731b41691ace9bcb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
from functools import lru_cache
from typing import Iterable
import jinja2
import rich_click as click
from docutils import nodes
from docutils.nodes import Element
# No stub exists for docutils.parsers.rst.directives. See https://github.com/python/typeshed/issues/5755.
from docutils.parsers.rst import Directive, directives # type: ignore[attr-defined]
from docutils.statemachine import StringList
from provider_yaml_utils import get_provider_yaml_paths, load_package_data
from sphinx.util import nested_parse_with_titles
from sphinx.util.docutils import switch_source_input
CMD_OPERATORS_AND_HOOKS = "operators-and-hooks"
CMD_TRANSFERS = "transfers"
"""
Directives for rendering tables with operators.
To test the template rendering process, you can also run this script as a standalone program.
PYTHONPATH=$PWD/../ python exts/operators_and_hooks_ref.py --help
"""
DEFAULT_HEADER_SEPARATOR = "="
CURRENT_DIR = os.path.dirname(__file__)
ROOT_DIR = os.path.abspath(os.path.join(CURRENT_DIR, os.pardir, os.pardir))
DOCS_DIR = os.path.join(ROOT_DIR, "docs")
@lru_cache(maxsize=None)
def _get_jinja_env():
loader = jinja2.FileSystemLoader(CURRENT_DIR, followlinks=True)
env = jinja2.Environment(loader=loader, undefined=jinja2.StrictUndefined)
return env
def _render_template(template_name, **kwargs):
return _get_jinja_env().get_template(template_name).render(**kwargs)
def _docs_path(filepath: str):
if not filepath.startswith("/docs/"):
raise Exception(f"The path must starts with '/docs/'. Current value: {filepath}")
if not filepath.endswith(".rst"):
raise Exception(f"The path must ends with '.rst'. Current value: {filepath}")
if filepath.startswith("/docs/apache-airflow-providers-"):
_, _, provider, rest = filepath.split("/", maxsplit=3)
filepath = f"{provider}:{rest}"
else:
filepath = os.path.join(ROOT_DIR, filepath.lstrip("/"))
filepath = os.path.relpath(filepath, DOCS_DIR)
len_rst = len(".rst")
filepath = filepath[:-len_rst]
return filepath
def _prepare_resource_index(package_data, resource_type):
return {
integration["integration-name"]: {**integration, "package-name": provider["package-name"]}
for provider in package_data
for integration in provider.get(resource_type, [])
}
def _prepare_operators_data(tags: set[str] | None):
package_data = load_package_data()
all_integrations = _prepare_resource_index(package_data, "integrations")
if tags is None:
to_display_integration = all_integrations.values()
else:
to_display_integration = [
integration for integration in all_integrations.values() if tags.intersection(integration["tags"])
]
all_operators_by_integration = _prepare_resource_index(package_data, "operators")
all_hooks_by_integration = _prepare_resource_index(package_data, "hooks")
all_sensors_by_integration = _prepare_resource_index(package_data, "hooks")
results = []
for integration in to_display_integration:
item = {
"integration": integration,
}
operators = all_operators_by_integration.get(integration["integration-name"])
sensors = all_sensors_by_integration.get(integration["integration-name"])
hooks = all_hooks_by_integration.get(integration["integration-name"])
if "how-to-guide" in item["integration"]:
item["integration"]["how-to-guide"] = [_docs_path(d) for d in item["integration"]["how-to-guide"]]
if operators:
item["operators"] = operators
if sensors:
item["hooks"] = sensors
if hooks:
item["hooks"] = hooks
if operators or sensors or hooks:
results.append(item)
return sorted(results, key=lambda d: d["integration"]["integration-name"].lower())
def _render_operator_content(*, tags: set[str] | None, header_separator: str):
tabular_data = _prepare_operators_data(tags)
return _render_template(
"operators_and_hooks_ref.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def _prepare_transfer_data(tags: set[str] | None):
package_data = load_package_data()
all_operators_by_integration = _prepare_resource_index(package_data, "integrations")
# Add edge case
for name in ["SQL", "Local"]:
all_operators_by_integration[name] = {"integration-name": name}
all_transfers = [
{
**transfer,
"package-name": provider["package-name"],
"source-integration": all_operators_by_integration[transfer["source-integration-name"]],
"target-integration": all_operators_by_integration[transfer["target-integration-name"]],
}
for provider in package_data
for transfer in provider.get("transfers", [])
]
if tags is None:
to_display_transfers = all_transfers
else:
to_display_transfers = [
transfer
for transfer in all_transfers
if tags.intersection(transfer["source-integration"].get("tags", set()))
or tags.intersection(transfer["target-integration"].get("tags", set()))
]
for transfer in to_display_transfers:
if "how-to-guide" not in transfer:
continue
transfer["how-to-guide"] = _docs_path(transfer["how-to-guide"])
return to_display_transfers
def _render_transfer_content(*, tags: set[str] | None, header_separator: str):
tabular_data = _prepare_transfer_data(tags)
return _render_template(
"operators_and_hooks_ref-transfers.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def _prepare_logging_data():
package_data = load_package_data()
all_logging = {}
for provider in package_data:
logging_handlers = provider.get("logging")
if logging_handlers:
package_name = provider["package-name"]
all_logging[package_name] = {"name": provider["name"], "handlers": logging_handlers}
return all_logging
def _render_logging_content(*, header_separator: str):
tabular_data = _prepare_logging_data()
return _render_template("logging.rst.jinja2", items=tabular_data, header_separator=header_separator)
def _prepare_auth_backend_data():
package_data = load_package_data()
all_auth_backends = {}
for provider in package_data:
auth_backends_list = provider.get("auth-backends")
if auth_backends_list:
package_name = provider["package-name"]
all_auth_backends[package_name] = {"name": provider["name"], "auth_backends": auth_backends_list}
return all_auth_backends
def _render_auth_backend_content(*, header_separator: str):
tabular_data = _prepare_auth_backend_data()
return _render_template("auth_backend.rst.jinja2", items=tabular_data, header_separator=header_separator)
def _prepare_secrets_backend_data():
package_data = load_package_data()
all_secret_backends = {}
for provider in package_data:
secret_backends_list = provider.get("secrets-backends")
if secret_backends_list:
package_name = provider["package-name"]
all_secret_backends[package_name] = {
"name": provider["name"],
"secrets_backends": secret_backends_list,
}
return all_secret_backends
def _render_secrets_backend_content(*, header_separator: str):
tabular_data = _prepare_secrets_backend_data()
return _render_template(
"secret_backend.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def _prepare_connections_data():
package_data = load_package_data()
all_connections = {}
for provider in package_data:
connections_list = provider.get("connection-types")
if connections_list:
package_name = provider["package-name"]
all_connections[package_name] = {
"name": provider["name"],
"connection_types": connections_list,
}
return all_connections
def _render_connections_content(*, header_separator: str):
tabular_data = _prepare_connections_data()
return _render_template("connections.rst.jinja2", items=tabular_data, header_separator=header_separator)
def _prepare_extra_links_data():
package_data = load_package_data()
all_extra_links = {}
for provider in package_data:
extra_link_list = provider.get("extra-links")
if extra_link_list:
package_name = provider["package-name"]
all_extra_links[package_name] = {
"name": provider["name"],
"extra_links": extra_link_list,
}
return all_extra_links
def _render_extra_links_content(*, header_separator: str):
tabular_data = _prepare_extra_links_data()
return _render_template("extra_links.rst.jinja2", items=tabular_data, header_separator=header_separator)
def _prepare_notifications_data():
package_data = load_package_data()
all_notifiers = {}
for provider in package_data:
notifications = provider.get("notifications")
if notifications:
package_name = provider["package-name"]
all_notifiers[package_name] = {
"name": provider["name"],
"notifications": notifications,
}
return all_notifiers
def _render_notification_content(*, header_separator: str):
tabular_data = _prepare_notifications_data()
return _render_template("notifications.rst.jinja2", items=tabular_data, header_separator=header_separator)
class BaseJinjaReferenceDirective(Directive):
"""The base directive for OperatorsHooksReferenceDirective and TransfersReferenceDirective"""
optional_arguments = 1
option_spec = {"tags": directives.unchanged, "header-separator": directives.unchanged_required}
def run(self):
tags_arg = self.options.get("tags")
tags = {t.strip() for t in tags_arg.split(",")} if tags_arg else None
header_separator = self.options.get("header-separator")
new_content = self.render_content(tags=tags, header_separator=header_separator)
with switch_source_input(self.state, self.content):
new_content = StringList(new_content.splitlines(), source="")
node: Element = nodes.section()
# necessary so that the child nodes get the right source/line set
node.document = self.state.document
nested_parse_with_titles(self.state, new_content, node)
# record all filenames as dependencies -- this will at least
# partially make automatic invalidation possible
for filepath in get_provider_yaml_paths():
self.state.document.settings.record_dependencies.add(filepath)
return node.children
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
"""Return content in RST format"""
raise NotImplementedError("Tou need to override render_content method.")
class OperatorsHooksReferenceDirective(BaseJinjaReferenceDirective):
"""Generates a list of operators, sensors, hooks"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_operator_content(
tags=tags,
header_separator=header_separator,
)
class TransfersReferenceDirective(BaseJinjaReferenceDirective):
"""Generate a list of transfer operators"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_transfer_content(
tags=tags,
header_separator=header_separator,
)
class LoggingDirective(BaseJinjaReferenceDirective):
"""Generate list of logging handlers"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_logging_content(
header_separator=header_separator,
)
class AuthBackendDirective(BaseJinjaReferenceDirective):
"""Generate list of auth backend handlers"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_auth_backend_content(
header_separator=header_separator,
)
class SecretsBackendDirective(BaseJinjaReferenceDirective):
"""Generate list of secret backend handlers"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_secrets_backend_content(
header_separator=header_separator,
)
class ConnectionsDirective(BaseJinjaReferenceDirective):
"""Generate list of connections"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_connections_content(
header_separator=header_separator,
)
class ExtraLinksDirective(BaseJinjaReferenceDirective):
"""Generate list of extra links"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_extra_links_content(
header_separator=header_separator,
)
class NotificationsDirective(BaseJinjaReferenceDirective):
"""Generate list of notifiers"""
def render_content(self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_notification_content(
header_separator=header_separator,
)
def setup(app):
"""Setup plugin"""
app.add_directive("operators-hooks-ref", OperatorsHooksReferenceDirective)
app.add_directive("transfers-ref", TransfersReferenceDirective)
app.add_directive("airflow-logging", LoggingDirective)
app.add_directive("airflow-auth-backends", AuthBackendDirective)
app.add_directive("airflow-secrets-backends", SecretsBackendDirective)
app.add_directive("airflow-connections", ConnectionsDirective)
app.add_directive("airflow-extra-links", ExtraLinksDirective)
app.add_directive("airflow-notifications", NotificationsDirective)
return {"parallel_read_safe": True, "parallel_write_safe": True}
option_tag = click.option(
"--tag",
multiple=True,
help="If passed, displays integrations that have a matching tag",
)
option_header_separator = click.option(
"--header-separator", default=DEFAULT_HEADER_SEPARATOR, show_default=True
)
@click.group(context_settings={"help_option_names": ["-h", "--help"], "max_content_width": 500})
def cli():
"""Render tables with integrations"""
@cli.command()
@option_tag
@option_header_separator
def operators_and_hooks(tag: Iterable[str], header_separator: str):
"""Renders Operators ahd Hooks content"""
print(_render_operator_content(tags=set(tag) if tag else None, header_separator=header_separator))
@cli.command()
@option_tag
@option_header_separator
def transfers(tag: Iterable[str], header_separator: str):
"""Renders Transfers content"""
print(_render_transfer_content(tags=set(tag) if tag else None, header_separator=header_separator))
@cli.command()
@option_header_separator
def logging(header_separator: str):
"""Renders Logger content"""
print(_render_logging_content(header_separator=header_separator))
@cli.command()
@option_header_separator
def auth_backends(header_separator: str):
"""Renders Logger content"""
print(_render_auth_backend_content(header_separator=header_separator))
@cli.command()
@option_header_separator
def secret_backends(header_separator: str):
"""Renders Secret Backends content"""
print(_render_secrets_backend_content(header_separator=header_separator))
@cli.command()
@option_header_separator
def connections(header_separator: str):
"""Renders Connections content"""
print(_render_connections_content(header_separator=header_separator))
@cli.command()
@option_header_separator
def extra_links(header_separator: str):
"""Renders Extra links content"""
print(_render_extra_links_content(header_separator=header_separator))
if __name__ == "__main__":
cli()