blob: 431d5fe55afe18a347397cca4fa3b80a5a365d0c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Manages all plugins."""
import importlib
import importlib.machinery
import importlib.util
import inspect
import logging
import os
import sys
import types
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type
try:
import importlib_metadata
except ImportError:
from importlib import metadata as importlib_metadata # type: ignore[no-redef]
from types import ModuleType
from airflow import settings
from airflow.utils.entry_points import entry_points_with_dist
from airflow.utils.file import find_path_from_directory
from airflow.utils.module_loading import as_importable_string
if TYPE_CHECKING:
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import ListenerManager
from airflow.timetables.base import Timetable
log = logging.getLogger(__name__)
import_errors: Dict[str, str] = {}
plugins = None # type: Optional[List[AirflowPlugin]]
# Plugin components to integrate as modules
registered_hooks: Optional[List['BaseHook']] = None
macros_modules: Optional[List[Any]] = None
executors_modules: Optional[List[Any]] = None
# Plugin components to integrate directly
admin_views: Optional[List[Any]] = None
flask_blueprints: Optional[List[Any]] = None
menu_links: Optional[List[Any]] = None
flask_appbuilder_views: Optional[List[Any]] = None
flask_appbuilder_menu_links: Optional[List[Any]] = None
global_operator_extra_links: Optional[List[Any]] = None
operator_extra_links: Optional[List[Any]] = None
registered_operator_link_classes: Optional[Dict[str, Type]] = None
registered_ti_dep_classes: Optional[Dict[str, Type]] = None
timetable_classes: Optional[Dict[str, Type["Timetable"]]] = None
"""Mapping of class names to class of OperatorLinks registered by plugins.
Used by the DAG serialization code to only allow specific classes to be created
during deserialization
"""
PLUGINS_ATTRIBUTES_TO_DUMP = {
"hooks",
"executors",
"macros",
"flask_blueprints",
"appbuilder_views",
"appbuilder_menu_items",
"global_operator_extra_links",
"operator_extra_links",
"ti_deps",
"timetables",
"source",
"listeners",
}
class AirflowPluginSource:
"""Class used to define an AirflowPluginSource."""
def __str__(self):
raise NotImplementedError
def __html__(self):
raise NotImplementedError
class PluginsDirectorySource(AirflowPluginSource):
"""Class used to define Plugins loaded from Plugins Directory."""
def __init__(self, path):
self.path = os.path.relpath(path, settings.PLUGINS_FOLDER)
def __str__(self):
return f"$PLUGINS_FOLDER/{self.path}"
def __html__(self):
return f"<em>$PLUGINS_FOLDER/</em>{self.path}"
class EntryPointSource(AirflowPluginSource):
"""Class used to define Plugins loaded from entrypoint."""
def __init__(self, entrypoint: importlib_metadata.EntryPoint, dist: importlib_metadata.Distribution):
self.dist = dist.metadata['Name']
self.version = dist.version
self.entrypoint = str(entrypoint)
def __str__(self):
return f"{self.dist}=={self.version}: {self.entrypoint}"
def __html__(self):
return f"<em>{self.dist}=={self.version}:</em> {self.entrypoint}"
class AirflowPluginException(Exception):
"""Exception when loading plugin."""
class AirflowPlugin:
"""Class used to define AirflowPlugin."""
name: Optional[str] = None
source: Optional[AirflowPluginSource] = None
hooks: List[Any] = []
executors: List[Any] = []
macros: List[Any] = []
admin_views: List[Any] = []
flask_blueprints: List[Any] = []
menu_links: List[Any] = []
appbuilder_views: List[Any] = []
appbuilder_menu_items: List[Any] = []
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links: List[Any] = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links: List[Any] = []
ti_deps: List[Any] = []
# A list of timetable classes that can be used for DAG scheduling.
timetables: List[Type["Timetable"]] = []
listeners: List[ModuleType] = []
@classmethod
def validate(cls):
"""Validates that plugin has a name."""
if not cls.name:
raise AirflowPluginException("Your plugin needs a name.")
@classmethod
def on_load(cls, *args, **kwargs):
"""
Executed when the plugin is loaded.
This method is only called once during runtime.
:param args: If future arguments are passed in on call.
:param kwargs: If future arguments are passed in on call.
"""
def is_valid_plugin(plugin_obj):
"""
Check whether a potential object is a subclass of
the AirflowPlugin class.
:param plugin_obj: potential subclass of AirflowPlugin
:return: Whether or not the obj is a valid subclass of
AirflowPlugin
"""
global plugins
if (
inspect.isclass(plugin_obj)
and issubclass(plugin_obj, AirflowPlugin)
and (plugin_obj is not AirflowPlugin)
):
plugin_obj.validate()
return plugin_obj not in plugins
return False
def register_plugin(plugin_instance):
"""
Start plugin load and register it after success initialization
:param plugin_instance: subclass of AirflowPlugin
"""
global plugins
plugin_instance.on_load()
plugins.append(plugin_instance)
def load_entrypoint_plugins():
"""
Load and register plugins AirflowPlugin subclasses from the entrypoints.
The entry_point group should be 'airflow.plugins'.
"""
global import_errors
log.debug("Loading plugins from entrypoints")
for entry_point, dist in entry_points_with_dist('airflow.plugins'):
log.debug('Importing entry_point plugin %s', entry_point.name)
try:
plugin_class = entry_point.load()
if not is_valid_plugin(plugin_class):
continue
plugin_instance = plugin_class()
plugin_instance.source = EntryPointSource(entry_point, dist)
register_plugin(plugin_instance)
except Exception as e:
log.exception("Failed to import plugin %s", entry_point.name)
import_errors[entry_point.module] = str(e)
def load_plugins_from_plugin_directory():
"""Load and register Airflow Plugins from plugins directory"""
global import_errors
log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
for file_path in find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore"):
if not os.path.isfile(file_path):
continue
mod_name, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != '.py':
continue
try:
loader = importlib.machinery.SourceFileLoader(mod_name, file_path)
spec = importlib.util.spec_from_loader(mod_name, loader)
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
loader.exec_module(mod)
log.debug('Importing plugin module %s', file_path)
for mod_attr_value in (m for m in mod.__dict__.values() if is_valid_plugin(m)):
plugin_instance = mod_attr_value()
plugin_instance.source = PluginsDirectorySource(file_path)
register_plugin(plugin_instance)
except Exception as e:
log.exception('Failed to import plugin %s', file_path)
import_errors[file_path] = str(e)
def make_module(name: str, objects: List[Any]):
"""Creates new module."""
if not objects:
return None
log.debug('Creating module %s', name)
name = name.lower()
module = types.ModuleType(name)
module._name = name.split('.')[-1] # type: ignore
module._objects = objects # type: ignore
module.__dict__.update((o.__name__, o) for o in objects)
return module
def ensure_plugins_loaded():
"""
Load plugins from plugins directory and entrypoints.
Plugins are only loaded if they have not been previously loaded.
"""
from airflow.stats import Stats
global plugins, registered_hooks
if plugins is not None:
log.debug("Plugins are already loaded. Skipping.")
return
if not settings.PLUGINS_FOLDER:
raise ValueError("Plugins folder is not set")
log.debug("Loading plugins")
with Stats.timer() as timer:
plugins = []
registered_hooks = []
load_plugins_from_plugin_directory()
load_entrypoint_plugins()
# We don't do anything with these for now, but we want to keep track of
# them so we can integrate them in to the UI's Connection screens
for plugin in plugins:
registered_hooks.extend(plugin.hooks)
num_loaded = len(plugins)
if num_loaded > 0:
log.debug("Loading %d plugin(s) took %.2f seconds", num_loaded, timer.duration)
def initialize_web_ui_plugins():
"""Collect extension points for WEB UI"""
global plugins
global flask_blueprints
global flask_appbuilder_views
global flask_appbuilder_menu_links
if (
flask_blueprints is not None
and flask_appbuilder_views is not None
and flask_appbuilder_menu_links is not None
):
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Initialize Web UI plugin")
flask_blueprints = []
flask_appbuilder_views = []
flask_appbuilder_menu_links = []
for plugin in plugins:
flask_appbuilder_views.extend(plugin.appbuilder_views)
flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
flask_blueprints.extend([{'name': plugin.name, 'blueprint': bp} for bp in plugin.flask_blueprints])
if (plugin.admin_views and not plugin.appbuilder_views) or (
plugin.menu_links and not plugin.appbuilder_menu_items
):
log.warning(
"Plugin \'%s\' may not be compatible with the current Airflow version. "
"Please contact the author of the plugin.",
plugin.name,
)
def initialize_ti_deps_plugins():
"""Creates modules for loaded extension from custom task instance dependency rule plugins"""
global registered_ti_dep_classes
if registered_ti_dep_classes is not None:
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Initialize custom taskinstance deps plugins")
registered_ti_dep_classes = {}
for plugin in plugins:
registered_ti_dep_classes.update(
{as_importable_string(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps}
)
def initialize_extra_operators_links_plugins():
"""Creates modules for loaded extension from extra operators links plugins"""
global global_operator_extra_links
global operator_extra_links
global registered_operator_link_classes
if (
global_operator_extra_links is not None
and operator_extra_links is not None
and registered_operator_link_classes is not None
):
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Initialize extra operators links plugins")
global_operator_extra_links = []
operator_extra_links = []
registered_operator_link_classes = {}
for plugin in plugins:
global_operator_extra_links.extend(plugin.global_operator_extra_links)
operator_extra_links.extend(list(plugin.operator_extra_links))
registered_operator_link_classes.update(
{as_importable_string(link.__class__): link.__class__ for link in plugin.operator_extra_links}
)
def initialize_timetables_plugins():
"""Collect timetable classes registered by plugins."""
global timetable_classes
if timetable_classes is not None:
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Initialize extra timetables plugins")
timetable_classes = {
as_importable_string(timetable_class): timetable_class
for plugin in plugins
for timetable_class in plugin.timetables
}
def integrate_executor_plugins() -> None:
"""Integrate executor plugins to the context."""
global plugins
global executors_modules
if executors_modules is not None:
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Integrate executor plugins")
executors_modules = []
for plugin in plugins:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")
plugin_name: str = plugin.name
executors_module = make_module('airflow.executors.' + plugin_name, plugin.executors)
if executors_module:
executors_modules.append(executors_module)
sys.modules[executors_module.__name__] = executors_module
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
global plugins
global macros_modules
from airflow import macros
if macros_modules is not None:
return
ensure_plugins_loaded()
if plugins is None:
raise AirflowPluginException("Can't load plugins.")
log.debug("Integrate DAG plugins")
macros_modules = []
for plugin in plugins:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")
macros_module = make_module(f'airflow.macros.{plugin.name}', plugin.macros)
if macros_module:
macros_modules.append(macros_module)
sys.modules[macros_module.__name__] = macros_module
# Register the newly created module on airflow.macros such that it
# can be accessed when rendering templates.
setattr(macros, plugin.name, macros_module)
def integrate_listener_plugins(listener_manager: "ListenerManager") -> None:
global plugins
ensure_plugins_loaded()
if plugins:
for plugin in plugins:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")
for listener in plugin.listeners:
listener_manager.add_listener(listener)
def get_plugin_info(attrs_to_dump: Optional[Iterable[str]] = None) -> List[Dict[str, Any]]:
"""
Dump plugins attributes
:param attrs_to_dump: A list of plugin attributes to dump
"""
ensure_plugins_loaded()
integrate_executor_plugins()
integrate_macros_plugins()
initialize_web_ui_plugins()
initialize_extra_operators_links_plugins()
if not attrs_to_dump:
attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
plugins_info = []
if plugins:
for plugin in plugins:
info: Dict[str, Any] = {"name": plugin.name}
for attr in attrs_to_dump:
if attr in ('global_operator_extra_links', 'operator_extra_links'):
info[attr] = [
f'<{as_importable_string(d.__class__)} object>' for d in getattr(plugin, attr)
]
elif attr in ('macros', 'timetables', 'hooks', 'executors'):
info[attr] = [as_importable_string(d) for d in getattr(plugin, attr)]
elif attr == 'listeners':
# listeners are always modules
info[attr] = [d.__name__ for d in getattr(plugin, attr)]
elif attr == 'appbuilder_views':
info[attr] = [
{**d, 'view': as_importable_string(d['view'].__class__) if 'view' in d else None}
for d in getattr(plugin, attr)
]
elif attr == 'flask_blueprints':
info[attr] = [
(
f"<{as_importable_string(d.__class__)}: "
f"name={d.name!r} import_name={d.import_name!r}>"
)
for d in getattr(plugin, attr)
]
else:
info[attr] = getattr(plugin, attr)
plugins_info.append(info)
return plugins_info