blob: 30ec5ceff8b3ffd5315a80de566585e1887c7376 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Operators
=========
An Operator is conceptually a template for a predefined :doc:`Task <tasks>`, that you can just define declaratively inside your DAG::
with DAG("my-dag") as dag:
ping = SimpleHttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include:
- :class:`~airflow.operators.bash.BashOperator` - executes a bash command
- :class:`~airflow.operators.python.PythonOperator` - calls an arbitrary Python function
- :class:`~airflow.operators.email.EmailOperator` - sends an email
For a list of all core operators, see: :doc:`Core Operators and Hooks Reference </operators-and-hooks-ref>`.
If the operator you need isn't installed with Airflow by default, you can probably find it as part of our huge set of community :doc:`provider packages <apache-airflow-providers:index>`. Some popular operators from here include:
- :class:`~airflow.providers.http.operators.http.SimpleHttpOperator`
- :class:`~airflow.providers.mysql.operators.mysql.MySqlOperator`
- :class:`~airflow.providers.postgres.operators.postgres.PostgresOperator`
- :class:`~airflow.providers.microsoft.mssql.operators.mssql.MsSqlOperator`
- :class:`~airflow.providers.oracle.operators.oracle.OracleOperator`
- :class:`~airflow.providers.jdbc.operators.jdbc.JdbcOperator`
- :class:`~airflow.providers.docker.operators.docker.DockerOperator`
- :class:`~airflow.providers.apache.hive.operators.hive.HiveOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator`
- :class:`~airflow.providers.mysql.transfers.presto_to_mysql.PrestoToMySqlOperator`
- :class:`~airflow.providers.slack.operators.slack.SlackAPIOperator`
But there are many, many more - you can see the full list of all community-managed operators, hooks, sensors
and transfers in our
:doc:`providers packages <apache-airflow-providers:operators-and-hooks-ref/index>` documentation.
.. note::
Inside Airflow's code, we often mix the concepts of :doc:`tasks` and Operators, and they are mostly
interchangeable. However, when we talk about a *Task*, we mean the generic "unit of execution" of a
DAG; when we talk about an *Operator*, we mean a reusable, pre-made Task template whose logic
is all done for you and that just needs some arguments.
.. _concepts:jinja-templating:
Jinja Templating
----------------
Airflow leverages the power of `Jinja Templating <http://jinja.pocoo.org/docs/dev/>`_ and this can be a powerful tool to use in combination with :ref:`macros <templates-ref>`.
For example, say you want to pass the start of the data interval as an environment variable to a Bash script using the ``BashOperator``:
.. code-block:: python
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
Here, ``{{ ds }}`` is a templated variable, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the data interval's start date will be available as an environment variable named ``DATA_INTERVAL_START`` in your Bash script.
You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the ``pre_execute`` function of your operator is called.
You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in ``template_fields`` property will be submitted to template substitution, like the ``path`` field in the example below:
.. code-block:: python
class MyDataReader:
template_fields = ["path"]
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
.. note:: The ``template_fields`` property can equally be a class variable or an instance variable.
Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:
.. code-block:: python
class MyDataTransformer:
template_fields = ["reader"]
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields = ["path"]
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
You can pass custom options to the Jinja ``Environment`` when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:
.. code-block:: python
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
See the `Jinja documentation <https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_ to find all available options.
Rendering Fields as Native Python Objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
By default, all the ``template_fields`` are rendered as strings.
Example, let's say ``extract`` task pushes a dictionary
(Example: ``{"1001": 301.27, "1002": 433.21, "1003": 502.22}``) to :ref:`XCom <concepts:xcom>` table.
Now, when the following task is run, ``order_data`` argument is passed a string, example:
``'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'``.
.. code-block:: python
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
If you instead want the rendered template field to return a Native Python object (``dict`` in our example),
you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
.. code-block:: python
dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(order_data):
print(type(order_data))
for value in order_data.values():
total_order_value += value
return {"total_order_value": total_order_value}
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
extract_task >> transform_task
In this case, ``order_data`` argument is passed: ``{"1001": 301.27, "1002": 433.21, "1003": 502.22}``.
Airflow uses Jinja's `NativeEnvironment <https://jinja.palletsprojects.com/en/2.11.x/nativetypes/>`_
when ``render_template_as_native_obj`` is set to ``True``.
With ``NativeEnvironment``, rendering a template produces a native Python type.
.. _concepts:reserved-keywords:
Reserved params keyword
-----------------------
In Apache Airflow 2.2.0 ``params`` variable is used during DAG serialization. Please do not use that name in third party operators.
If you upgrade your environment and get the following error:
.. code-block::
AttributeError: 'str' object has no attribute '__module__'
change name from ``params`` in your operators.