blob: 7b46a9de8a969075a0b5a9c3313f4d0d8767a15a [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Listener Plugin of Airflow
==========================
Airflow has feature that allows to add listener for monitoring and tracking
the task state using Plugins.
This is a simple example listener plugin of Airflow that helps to track the task
state and collect useful metadata information about the task, dag run and dag.
This is an example plugin for Airflow that allows to create listener plugin of Airflow.
This plugin works by using SQLAlchemy's event mechanism. It watches
the task instance state change in the table level and triggers event.
This will be notified for all the tasks across all the DAGs.
In this plugin, an object reference is derived from the base class
``airflow.plugins_manager.AirflowPlugin``.
Listener plugin uses pluggy app under the hood. Pluggy is an app built for plugin
management and hook calling for Pytest. Pluggy enables function hooking so it allows
building "pluggable" systems with your own customization over that hooking.
Using this plugin, following events can be listened:
* task instance is in running state.
* task instance is in success state.
* task instance is in failure state.
* dag run is in running state.
* dag run is in success state.
* dag run is in failure state.
* on start before event like airflow job, scheduler or backfilljob
* before stop for event like airflow job, scheduler or backfilljob
Listener Registration
---------------------
A listener plugin with object reference to listener object is registered
as part of airflow plugin. The following is a
skeleton for us to implement a new listener:
.. code-block:: python
from airflow.plugins_manager import AirflowPlugin
# This is the listener file created where custom code to monitor is added over hookimpl
import listener
class MetadataCollectionPlugin(AirflowPlugin):
name = "MetadataCollectionPlugin"
listeners = [listener]
Next, we can check code added into ``listener`` and see implementation
methods for each of those listeners. After the implementation, the listener part
gets executed during all the task execution across all the DAGs
For reference, here's the plugin code within ``listener.py`` class that shows list of tables in the database:
This example listens when the task instance is in running state
.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_ti_running_task]
:end-before: [END howto_listen_ti_running_task]
Similarly, code to listen after task_instance success and failure can be implemented.
This example listens when the dag run is change to failed state
.. exampleinclude:: ../../../airflow/example_dags/plugins/event_listener.py
:language: python
:start-after: [START howto_listen_dagrun_failure_task]
:end-before: [END howto_listen_dagrun_failure_task]
Similarly, code to listen after dag_run success and during running state can be implemented.
The listener plugin files required to add the listener implementation is added as part of the
Airflow plugin into ``$AIRFLOW_HOME/plugins/`` folder and loaded during Airflow startup.