blob: 6af29b9c88f0d8b7dfe6dd2da92c3c25413b2a31 [file] [log] [blame]
:py:mod:`airflow.models.xcom_arg`
=================================
.. py:module:: airflow.models.xcom_arg
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.models.xcom_arg.XComArg
airflow.models.xcom_arg.PlainXComArg
airflow.models.xcom_arg.MapXComArg
airflow.models.xcom_arg.ZipXComArg
Functions
~~~~~~~~~
.. autoapisummary::
airflow.models.xcom_arg.serialize_xcom_arg
airflow.models.xcom_arg.deserialize_xcom_arg
.. py:class:: XComArg
Bases: :py:obj:`airflow.models.taskmixin.DependencyMixin`
Reference to an XCom value pushed from another operator.
The implementation supports::
xcomarg >> op
xcomarg << op
op >> xcomarg # By BaseOperator code
op << xcomarg # By BaseOperator code
**Example**: The moment you get a result from any operator (decorated or regular) you can ::
any_op = AnyOperator()
xcomarg = XComArg(any_op)
# or equivalently
xcomarg = any_op.output
my_op = MyOperator()
my_op >> xcomarg
This object can be used in legacy Operators via Jinja.
**Example**: You can make this result to be part of any generated string::
any_op = AnyOperator()
xcomarg = any_op.output
op1 = MyOperator(my_text_message=f"the value is {xcomarg}")
op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}")
:param operator: Operator instance to which the XComArg references.
:param key: Key used to pull the XCom value. Defaults to *XCOM_RETURN_KEY*,
i.e. the referenced operator's return value.
.. py:method:: iter_xcom_args(arg)
:staticmethod:
Return XComArg instances in an arbitrary value.
Recursively traverse ``arg`` and look for XComArg instances in any
collection objects, and instances with ``template_fields`` set.
.. py:method:: apply_upstream_relationship(op, arg)
:staticmethod:
Set dependency for XComArgs.
This looks for XComArg objects in ``arg`` "deeply" (looking inside
collections objects and classes decorated with ``template_fields``), and
sets the relationship to ``op`` on any found.
.. py:method:: roots()
:property:
Required by TaskMixin
.. py:method:: leaves()
:property:
Required by TaskMixin
.. py:method:: set_upstream(task_or_task_list, edge_modifier = None)
Proxy to underlying operator set_upstream method. Required by TaskMixin.
.. py:method:: set_downstream(task_or_task_list, edge_modifier = None)
Proxy to underlying operator set_downstream method. Required by TaskMixin.
.. py:method:: iter_references()
:abstractmethod:
Iterate through (operator, key) references.
.. py:method:: map(f)
.. py:method:: zip(*others, fillvalue = NOTSET)
.. py:method:: get_task_map_length(run_id, *, session)
:abstractmethod:
Inspect length of pushed value for task-mapping.
This is used to determine how many task instances the scheduler should
create for a downstream using this XComArg for task-mapping.
*None* may be returned if the depended XCom has not been pushed.
.. py:method:: resolve(context, session = NEW_SESSION)
:abstractmethod:
Pull XCom value.
This should only be called during ``op.execute()`` in respectable context.
:meta private:
.. py:class:: PlainXComArg(operator, key = XCOM_RETURN_KEY)
Bases: :py:obj:`XComArg`
Reference to one single XCom without any additional semantics.
This class should not be accessed directly, but only through XComArg. The
class inheritance chain and ``__new__`` is implemented in this slightly
convoluted way because we want to
a. Allow the user to continue using XComArg directly for the simple
semantics (see documentation of the base class for details).
b. Make ``isinstance(thing, XComArg)`` be able to detect all kinds of XCom
references.
c. Not allow many properties of PlainXComArg (including ``__getitem__`` and
``__str__``) to exist on other kinds of XComArg implementations since
they don't make sense.
:meta private:
.. py:method:: __eq__(other)
Return self==value.
.. py:method:: __getitem__(item)
Implements xcomresult['some_result_key']
.. py:method:: __iter__()
Override iterable protocol to raise error explicitly.
The default ``__iter__`` implementation in Python calls ``__getitem__``
with 0, 1, 2, etc. until it hits an ``IndexError``. This does not work
well with our custom ``__getitem__`` implementation, and results in poor
DAG-writing experience since a misplaced ``*`` expansion would create an
infinite loop consuming the entire DAG parser.
This override catches the error eagerly, so an incorrectly implemented
DAG fails fast and avoids wasting resources on nonsensical iterating.
.. py:method:: __repr__()
Return repr(self).
.. py:method:: __str__()
Backward compatibility for old-style jinja used in Airflow Operators
**Example**: to use XComArg at BashOperator::
BashOperator(cmd=f"... { xcomarg } ...")
:return:
.. py:method:: iter_references()
Iterate through (operator, key) references.
.. py:method:: map(f)
.. py:method:: zip(*others, fillvalue = NOTSET)
.. py:method:: get_task_map_length(run_id, *, session)
Inspect length of pushed value for task-mapping.
This is used to determine how many task instances the scheduler should
create for a downstream using this XComArg for task-mapping.
*None* may be returned if the depended XCom has not been pushed.
.. py:method:: resolve(context, session = NEW_SESSION)
Pull XCom value.
This should only be called during ``op.execute()`` in respectable context.
:meta private:
.. py:class:: MapXComArg(arg, callables)
Bases: :py:obj:`XComArg`
An XCom reference with ``map()`` call(s) applied.
This is based on an XComArg, but also applies a series of "transforms" that
convert the pulled XCom value.
.. py:method:: __repr__()
Return repr(self).
.. py:method:: iter_references()
Iterate through (operator, key) references.
.. py:method:: map(f)
.. py:method:: get_task_map_length(run_id, *, session)
Inspect length of pushed value for task-mapping.
This is used to determine how many task instances the scheduler should
create for a downstream using this XComArg for task-mapping.
*None* may be returned if the depended XCom has not been pushed.
.. py:method:: resolve(context, session = NEW_SESSION)
Pull XCom value.
This should only be called during ``op.execute()`` in respectable context.
:meta private:
.. py:class:: ZipXComArg(args, *, fillvalue = NOTSET)
Bases: :py:obj:`XComArg`
An XCom reference with ``zip()`` applied.
This is constructed from multiple XComArg instances, and presents an
iterable that "zips" them together like the built-in ``zip()`` (and
``itertools.zip_longest()`` if ``fillvalue`` is provided).
.. py:method:: __repr__()
Return repr(self).
.. py:method:: iter_references()
Iterate through (operator, key) references.
.. py:method:: get_task_map_length(run_id, *, session)
Inspect length of pushed value for task-mapping.
This is used to determine how many task instances the scheduler should
create for a downstream using this XComArg for task-mapping.
*None* may be returned if the depended XCom has not been pushed.
.. py:method:: resolve(context, session = NEW_SESSION)
Pull XCom value.
This should only be called during ``op.execute()`` in respectable context.
:meta private:
.. py:function:: serialize_xcom_arg(value)
DAG serialization interface.
.. py:function:: deserialize_xcom_arg(data, dag)
DAG serialization interface.