blob: fccc99197f055aae34eae77acd622a138d54f441 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from functools import lru_cache
from typing import Optional, Set
import jinja2
from docutils import nodes
from docutils.nodes import Element
from docutils.parsers.rst import Directive, directives
from docutils.statemachine import StringList
from provider_yaml_utils import ( # pylint: disable=no-name-in-module
get_provider_yaml_paths,
load_package_data,
)
from sphinx.util import nested_parse_with_titles
from sphinx.util.docutils import switch_source_input
CMD_OPERATORS_AND_HOOKS = "operators-and-hooks"
CMD_TRANSFERS = 'transfers'
"""
Directives for rendering tables with operators.
To test the template rendering process, you can also run this script as a standalone program.
PYTHONPATH=$PWD/../ python exts/operators_and_hooks_ref.py --help
"""
DEFAULT_HEADER_SEPARATOR = "="
CURRENT_DIR = os.path.dirname(__file__)
ROOT_DIR = os.path.abspath(os.path.join(CURRENT_DIR, os.pardir, os.pardir))
DOCS_DIR = os.path.join(ROOT_DIR, 'docs')
@lru_cache(maxsize=None)
def _get_jinja_env():
loader = jinja2.FileSystemLoader(CURRENT_DIR, followlinks=True)
env = jinja2.Environment(loader=loader, undefined=jinja2.StrictUndefined)
return env
def _render_template(template_name, **kwargs):
return _get_jinja_env().get_template(template_name).render(**kwargs)
def _docs_path(filepath: str):
if not filepath.startswith("/docs/"):
raise Exception(f"The path must starts with '/docs/'. Current value: {filepath}")
if not filepath.endswith(".rst"):
raise Exception(f"The path must ends with '.rst'. Current value: {filepath}")
if filepath.startswith("/docs/apache-airflow-providers-"):
_, _, provider, rest = filepath.split("/", maxsplit=3)
filepath = f"{provider}:{rest}"
else:
filepath = os.path.join(ROOT_DIR, filepath.lstrip('/'))
filepath = os.path.relpath(filepath, DOCS_DIR)
len_rst = len(".rst")
filepath = filepath[:-len_rst]
return filepath
def _prepare_resource_index(package_data, resource_type):
return {
integration["integration-name"]: {**integration, 'package-name': provider['package-name']}
for provider in package_data
for integration in provider.get(resource_type, [])
}
def _prepare_operators_data(tags: Optional[Set[str]]):
package_data = load_package_data()
all_integrations = _prepare_resource_index(package_data, "integrations")
if tags is None:
to_display_integration = all_integrations
else:
to_display_integration = [
integration for integration in all_integrations.values() if tags.intersection(integration["tags"])
]
all_operators_by_integration = _prepare_resource_index(package_data, "operators")
all_hooks_by_integration = _prepare_resource_index(package_data, "hooks")
all_sensors_by_integration = _prepare_resource_index(package_data, "hooks")
results = []
for integration in to_display_integration:
item = {
"integration": integration,
}
operators = all_operators_by_integration.get(integration['integration-name'])
sensors = all_sensors_by_integration.get(integration['integration-name'])
hooks = all_hooks_by_integration.get(integration['integration-name'])
if 'how-to-guide' in item['integration']:
item['integration']['how-to-guide'] = [_docs_path(d) for d in item['integration']['how-to-guide']]
if operators:
item['operators'] = operators
if sensors:
item['hooks'] = sensors
if hooks:
item['hooks'] = hooks
if operators or sensors or hooks:
results.append(item)
return sorted(results, key=lambda d: d["integration"]["integration-name"].lower())
def _render_operator_content(*, tags: Optional[Set[str]], header_separator: str = DEFAULT_HEADER_SEPARATOR):
tabular_data = _prepare_operators_data(tags)
return _render_template(
"operators_and_hooks_ref.rst.jinja2", items=tabular_data, header_separator=header_separator
)
def _prepare_transfer_data(tags: Optional[Set[str]]):
package_data = load_package_data()
all_operators_by_integration = _prepare_resource_index(package_data, "integrations")
# Add edge case
for name in ["SQL", "Local"]:
all_operators_by_integration[name] = {"integration-name": name}
all_transfers = [
{
**transfer,
'package-name': provider['package-name'],
'source-integration': all_operators_by_integration[transfer['source-integration-name']],
'target-integration': all_operators_by_integration[transfer['target-integration-name']],
}
for provider in package_data
for transfer in provider.get("transfers", [])
]
if tags is None:
to_display_transfers = all_transfers
else:
to_display_transfers = [
transfer
for transfer in all_transfers
if tags.intersection(transfer['source-integration'].get('tags', set()))
or tags.intersection(transfer['target-integration'].get('tags', set()))
]
for transfer in to_display_transfers:
if 'how-to-guide' not in transfer:
continue
transfer['how-to-guide'] = _docs_path(transfer['how-to-guide'])
return to_display_transfers
def _render_transfer_content(*, tags: Optional[Set[str]], header_separator: str = DEFAULT_HEADER_SEPARATOR):
tabular_data = _prepare_transfer_data(tags)
return _render_template(
"operators_and_hooks_ref-transfers.rst.jinja2", items=tabular_data, header_separator=header_separator
)
class BaseJinjaReferenceDirective(Directive):
"""The base directive for OperatorsHooksReferenceDirective and TransfersReferenceDirective"""
optional_arguments = 1
option_spec = {"tags": directives.unchanged, 'header-separator': directives.unchanged_required}
def run(self):
tags_arg = self.options.get("tags")
tags = {t.strip() for t in tags_arg.split(",")} if tags_arg else None
header_separator = self.options.get('header-separator')
new_content = self.render_content(tags=tags, header_separator=header_separator)
with switch_source_input(self.state, self.content):
new_content = StringList(new_content.splitlines(), source='')
node = nodes.section() # type: Element
# necessary so that the child nodes get the right source/line set
node.document = self.state.document
nested_parse_with_titles(self.state, new_content, node)
# record all filenames as dependencies -- this will at least
# partially make automatic invalidation possible
for filepath in get_provider_yaml_paths():
self.state.document.settings.record_dependencies.add(filepath)
return node.children
def render_content(self, *, tags: Optional[Set[str]], header_separator: str = DEFAULT_HEADER_SEPARATOR):
"""Return content in RST format"""
raise NotImplementedError("Tou need to override render_content method.")
class OperatorsHooksReferenceDirective(BaseJinjaReferenceDirective):
"""Generates a list of operators, sensors, hooks"""
def render_content(self, *, tags: Optional[Set[str]], header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_operator_content(
tags=tags,
header_separator=header_separator,
)
class TransfersReferenceDirective(BaseJinjaReferenceDirective):
"""Generate a list of transfer operators"""
def render_content(self, *, tags: Optional[Set[str]], header_separator: str = DEFAULT_HEADER_SEPARATOR):
return _render_transfer_content(
tags=tags,
header_separator=header_separator,
)
def setup(app):
"""Setup plugin"""
app.add_directive('operators-hooks-ref', OperatorsHooksReferenceDirective)
app.add_directive('transfers-ref', TransfersReferenceDirective)
return {'parallel_read_safe': True, 'parallel_write_safe': True}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Render tables with integrations.')
parser.add_argument(
'--tag',
dest='tags',
action="append",
help='If passed, displays integrations that have a matching tag.',
)
parser.add_argument('--header-separator', default=DEFAULT_HEADER_SEPARATOR)
subparsers = parser.add_subparsers(help='sub-command help', metavar="COMMAND")
subparsers.required = True
parser_a = subparsers.add_parser(CMD_OPERATORS_AND_HOOKS)
parser_a.set_defaults(cmd=CMD_OPERATORS_AND_HOOKS)
parser_b = subparsers.add_parser(CMD_TRANSFERS)
parser_b.set_defaults(cmd=CMD_TRANSFERS)
args = parser.parse_args()
if args.cmd == CMD_OPERATORS_AND_HOOKS:
content = _render_operator_content(
tags=set(args.tags) if args.tags else None, header_separator=args.header_separator
)
else:
content = _render_transfer_content(
tags=set(args.tags) if args.tags else None, header_separator=args.header_separator
)
print(content)