blob: f6bb443eefb9f66024e57aa9e7a8c08b1a033ed5 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Creating a custom Operator
==========================
Airflow allows you to create new operators to suit the requirements of you or your team.
The extensibility is one of the many reasons which makes Apache Airflow powerful.
You can create any operator you want by extending the :class:`airflow.models.baseoperator.BaseOperator`
There are two methods that you need to override in a derived class:
* Constructor - Define the parameters required for the operator. You only need to specify the arguments specific to your operator.
Use ``@apply_defaults`` decorator function to fill unspecified arguments with ``default_args``. You can specify the ``default_args``
in the dag file. See :ref:`Default args <default-args>` for more details.
* Execute - The code to execute when the runner calls the operator. The method contains the
airflow context as a parameter that can be used to read config values.
Let's implement an example ``HelloOperator`` in a new file ``hello_operator.py``:
.. code-block:: python
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
.. note::
For imports to work, you should place the file in a directory that
is present in the :envvar:`PYTHONPATH` env. Airflow adds ``dags/``, ``plugins/``, and ``config/`` directories
in the Airflow home to :envvar:`PYTHONPATH` by default. e.g., In our example,
the file is placed in the ``custom_operator/`` directory.
See :doc:`../modules_management` for details on how Python and Airflow manage modules.
You can now use the derived custom operator as follows:
.. code-block:: python
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
If an operator communicates with an external service (API, database, etc) it's a good idea
to implement the communication layer using a :ref:`custom-operator/hook`. In this way the implemented logic
can be reused by other users in different operators. Such approach provides better decoupling and
utilization of added integration than using ``CustomServiceBaseOperator`` for each external service.
Other consideration is the temporary state. If an operation requires an in-memory state (for example
a job id that should be used in ``on_kill`` method to cancel a request) then the state should be keep
in the operator not in a hook. In this way the service hook can be completely state-less and whole
logic of an operation is in one place - in the operator.
.. _custom-operator/hook:
Hooks
^^^^^
Hooks act as an interface to communicate with the external shared resources in a DAG.
For example, multiple tasks in a DAG can require access to a MySQL database. Instead of
creating a connection per task, you can retrieve a connection from the hook and utilize it.
Hook also helps to avoid storing connection auth parameters in a DAG.
See :doc:`connection` for how to create and manage connections and :doc:`apache-airflow-providers:index` for
details of how to add your custom connection types via providers.
Let's extend our previous example to fetch name from MySQL:
.. code-block:: python
class HelloDBOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
mysql_conn_id: str,
database: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = "Hello {}".format(result['name'])
print(message)
return message
When the operator invokes the query on the hook object, a new connection gets created if it doesn't exist.
The hook retrieves the auth parameters such as username and password from Airflow
backend and passes the params to the :py:func:`airflow.hooks.base.BaseHook.get_connection`.
You should create hook only in the ``execute`` method or any method which is called from ``execute``.
The constructor gets called whenever Airflow parses a DAG which happens frequently. And instantiating a hook
there will result in many unnecessary database connections.
The ``execute`` gets called only during a DAG run.
User interface
^^^^^^^^^^^^^^^
Airflow also allows the developer to control how the operator shows up in the DAG UI.
Override ``ui_color`` to change the background color of the operator in UI.
Override ``ui_fgcolor`` to change the color of the label.
.. code-block:: python
class HelloOperator(BaseOperator):
ui_color = '#ff0000'
ui_fgcolor = '#000000'
....
Templating
^^^^^^^^^^^
You can use :ref:`Jinja templates <jinja-templating>` to parameterize your operator.
Airflow considers the field names present in ``template_fields`` for templating while rendering
the operator.
.. code-block:: python
class HelloOperator(BaseOperator):
template_fields = ['name']
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = "Hello from {}".format(self.name)
print(message)
return message
You can use the template as follows:
.. code-block:: python
with dag:
hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}')
In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ task_instance.task_id }}`` with
``task_id_1``.
The parameter can also contain a file name, for example, a bash script or a SQL file. You need to add
the extension of your file in ``template_ext``. If a ``template_field`` contains a string ending with
the extension mentioned in ``template_ext``, Jinja reads the content of the file and replace the templates
with actual value. Note that Jinja substitutes the operator attributes and not the args.
.. code-block:: python
class HelloOperator(BaseOperator):
template_fields = ['guest_name']
template_ext = ['.sql']
@apply_defaults
def __init__(
self,
name: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
In the example, the ``template_fields`` should be ``['guest_name']`` and not ``['name']``
Additionally you may provide ``template_fields_renderers`` dictionary which defines in what style the value
from template field renders in Web UI. For example:
.. code-block:: python
class MyRequestOperator(BaseOperator):
template_fields = ['request_body']
template_fields_renderers = {'request_body': 'json'}
@apply_defaults
def __init__(
self,
request_body: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.request_body = request_body
Currently available lexers:
- bash
- doc
- json
- md
- py
- rst
- sql
- yaml
If you use a non existing lexer then the value of the template field will be rendered as a pretty printed object.
Define an operator extra link
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For your operator, you can :doc:`Define an extra link <define_extra_link>` that can
redirect users to external systems. For example, you can add a link that redirects
the user to the operator's manual.
Sensors
^^^^^^^^
Airflow provides a primitive for a special kind of operator, whose purpose is to
poll some state (e.g. presence of a file) on a regular interval until a
success criteria is met.
You can create any sensor your want by extending the :class:`airflow.sensors.base.BaseSensorOperator`
defining a ``poke`` method to poll your external state and evaluate the success criteria.
Sensors have a powerful feature called ``'reschedule'`` mode which allows the sensor to
task to be rescheduled, rather than blocking a worker slot between pokes.
This is useful when you can tolerate a longer poll interval and expect to be
polling for a long time.
Reschedule mode comes with a caveat that your sensor cannot maintain internal state
between rescheduled executions. In this case you should decorate your sensor with
:meth:`airflow.sensors.base.poke_mode_only`. This will let users know
that your sensor is not suitable for use with reschedule mode.
An example of a sensor that keeps internal state and cannot be used with reschedule mode
is :class:`airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor`.
It polls the number of objects at a prefix (this number is the internal state of the sensor)
and succeeds when there a certain amount of time has passed without the number of objects changing.