blob: 5fd680e7ed3e172c349574b141e06f29e41f7b06 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import object
import imp
import inspect
import logging
import os
import re
from typing import Any, Dict, List, Type
import pkg_resources
from airflow import settings
from airflow.models.baseoperator import BaseOperatorLink
log = logging.getLogger(__name__)
import_errors = {}
class AirflowPluginException(Exception):
pass
class AirflowPlugin(object):
name = None # type: str
operators = [] # type: List[Any]
sensors = [] # type: List[Any]
hooks = [] # type: List[Any]
executors = [] # type: List[Any]
macros = [] # type: List[Any]
admin_views = [] # type: List[Any]
flask_blueprints = [] # type: List[Any]
menu_links = [] # type: List[Any]
appbuilder_views = [] # type: List[Any]
appbuilder_menu_items = [] # type: List[Any]
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = [] # type: List[BaseOperatorLink]
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = [] # type: List[BaseOperatorLink]
@classmethod
def validate(cls):
if not cls.name:
raise AirflowPluginException("Your plugin needs a name.")
@classmethod
def on_load(cls, *args, **kwargs):
"""
Executed when the plugin is loaded.
This method is only called once during runtime.
:param args: If future arguments are passed in on call.
:param kwargs: If future arguments are passed in on call.
"""
def load_entrypoint_plugins(entry_points, airflow_plugins):
"""
Load AirflowPlugin subclasses from the entrypoints
provided. The entry_point group should be 'airflow.plugins'.
:param entry_points: A collection of entrypoints to search for plugins
:type entry_points: Generator[setuptools.EntryPoint, None, None]
:param airflow_plugins: A collection of existing airflow plugins to
ensure we don't load duplicates
:type airflow_plugins: list[type[airflow.plugins_manager.AirflowPlugin]]
:rtype: list[airflow.plugins_manager.AirflowPlugin]
"""
global import_errors # pylint: disable=global-statement
for entry_point in entry_points:
log.debug('Importing entry_point plugin %s', entry_point.name)
try:
plugin_obj = entry_point.load()
if is_valid_plugin(plugin_obj, airflow_plugins):
if callable(getattr(plugin_obj, 'on_load', None)):
plugin_obj.on_load()
airflow_plugins.append(plugin_obj)
except Exception as e: # pylint: disable=broad-except
log.exception("Failed to import plugin %s", entry_point.name)
import_errors[entry_point.module_name] = str(e)
return airflow_plugins
def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
the AirflowPlugin class.
:param plugin_obj: potential subclass of AirflowPlugin
:param existing_plugins: Existing list of AirflowPlugin subclasses
:return: Whether or not the obj is a valid subclass of
AirflowPlugin
"""
if (
inspect.isclass(plugin_obj) and
issubclass(plugin_obj, AirflowPlugin) and
(plugin_obj is not AirflowPlugin)
):
plugin_obj.validate()
return plugin_obj not in existing_plugins
return False
plugins = [] # type: List[AirflowPlugin]
norm_pattern = re.compile(r'[/|.]')
if settings.PLUGINS_FOLDER is None:
raise AirflowPluginException("Plugins folder is not set")
# Crawl through the plugins folder to find AirflowPlugin derivatives
for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
log.debug('Importing plugin module %s', filepath)
# normalize root path as namespace
namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
m = imp.load_source(namespace, filepath)
for obj in list(m.__dict__.values()):
if is_valid_plugin(obj, plugins):
plugins.append(obj)
except Exception as e:
log.exception(e)
log.error('Failed to import plugin %s', filepath)
import_errors[filepath] = str(e)
plugins = load_entrypoint_plugins(
pkg_resources.iter_entry_points('airflow.plugins'),
plugins
)
def make_module(name, objects):
log.debug('Creating module %s', name)
name = name.lower()
module = imp.new_module(name)
module._name = name.split('.')[-1]
module._objects = objects
module.__dict__.update((o.__name__, o) for o in objects)
return module
# Plugin components to integrate as modules
operators_modules = []
sensors_modules = []
hooks_modules = []
executors_modules = []
macros_modules = []
# Plugin components to integrate directly
admin_views = [] # type: List[Any]
flask_blueprints = [] # type: List[Any]
menu_links = [] # type: List[Any]
flask_appbuilder_views = [] # type: List[Any]
flask_appbuilder_menu_links = [] # type: List[Any]
global_operator_extra_links = [] # type: List[BaseOperatorLink]
operator_extra_links = [] # type: List[BaseOperatorLink]
registered_operator_link_classes = {} # type: Dict[str, Type]
"""Mapping of class names to class of OperatorLinks registered by plugins.
Used by the DAG serialization code to only allow specific classes to be created
during deserialization
"""
for p in plugins:
operators_modules.append(
make_module('airflow.operators.' + p.name, p.operators + p.sensors))
sensors_modules.append(
make_module('airflow.sensors.' + p.name, p.sensors)
)
hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors_modules.append(
make_module('airflow.executors.' + p.name, p.executors))
macros_modules.append(make_module('airflow.macros.' + p.name, p.macros))
admin_views.extend(p.admin_views)
menu_links.extend(p.menu_links)
flask_appbuilder_views.extend(p.appbuilder_views)
flask_appbuilder_menu_links.extend(p.appbuilder_menu_items)
flask_blueprints.extend([{
'name': p.name,
'blueprint': bp
} for bp in p.flask_blueprints])
global_operator_extra_links.extend(p.global_operator_extra_links)
operator_extra_links.extend([ope for ope in p.operator_extra_links])
registered_operator_link_classes.update({
"{}.{}".format(link.__class__.__module__,
link.__class__.__name__): link.__class__
for link in p.operator_extra_links
})