blob: d5c3407a86daec27dca4dd4e78674870223c0149 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import object
import imp
import inspect
import os
import re
import sys
from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
class AirflowPluginException(Exception):
pass
class AirflowPlugin(object):
name = None
operators = []
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
@classmethod
def validate(cls):
if not cls.name:
raise AirflowPluginException("Your plugin needs a name.")
plugins_folder = configuration.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = configuration.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)
if plugins_folder not in sys.path:
sys.path.append(plugins_folder)
plugins = []
norm_pattern = re.compile(r'[/|.]')
# Crawl through the plugins folder to find AirflowPlugin derivatives
for root, dirs, files in os.walk(plugins_folder, followlinks=True):
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
log.debug('Importing plugin module %s', filepath)
# normalize root path as namespace
namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
m = imp.load_source(namespace, filepath)
for obj in list(m.__dict__.values()):
if (
inspect.isclass(obj) and
issubclass(obj, AirflowPlugin) and
obj is not AirflowPlugin):
obj.validate()
if obj not in plugins:
plugins.append(obj)
except Exception as e:
log.exception(e)
log.error('Failed to import plugin %s', filepath)
def make_module(name, objects):
log.debug('Creating module %s', name)
name = name.lower()
module = imp.new_module(name)
module._name = name.split('.')[-1]
module._objects = objects
module.__dict__.update((o.__name__, o) for o in objects)
return module
# Plugin components to integrate as modules
operators_modules = []
hooks_modules = []
executors_modules = []
macros_modules = []
# Plugin components to integrate directly
admin_views = []
flask_blueprints = []
menu_links = []
for p in plugins:
operators_modules.append(
make_module('airflow.operators.' + p.name, p.operators))
hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors_modules.append(
make_module('airflow.executors.' + p.name, p.executors))
macros_modules.append(make_module('airflow.macros.' + p.name, p.macros))
admin_views.extend(p.admin_views)
flask_blueprints.extend(p.flask_blueprints)
menu_links.extend(p.menu_links)