blob: 9685184b83e9d40d1def0c478f631dc0c376abeb [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Serialization
=============
To support data exchange, like arguments, between tasks, Airflow needs to serialize the data to be exchanged and
deserialize it again when required in a downstream task. Serialization also happens so that the webserver and
the scheduler (as opposed to the DAG processor) do no need to read the DAG file. This is done for security purposes
and efficiency.
Serialization is a surprisingly hard job. Python out of the box only has support for serialization of primitives,
like ``str`` and ``int`` and it loops over iterables. When things become more complex, custom serialization is required.
Airflow out of the box supports three ways of custom serialization. Primitives are returned as is, without
additional encoding, e.g. a ``str`` remains a ``str``. When it is not a primitive (or iterable thereof) Airflow
looks for a registered serializer and deserializer in the namespace of ``airflow.serialization.serializers``.
If not found it will look in the class for a ``serialize()`` method or in case of deserialization a
``deserialize(data, version: int)`` method. Finally, if the class is either decorated with ``@dataclass``
or ``@attr.define`` it will use the public methods for those decorators.
If you are looking to extend Airflow with a new serializer, it is good to know when to choose what way of serialization.
Objects that are under the control of Airflow, i.e. residing under the namespace of ``airflow.*`` like
``airflow.model.dag.DAG`` or under control of the developer e.g. ``my.company.Foo`` should first be examined to see
whether they can be decorated with ``@attr.define`` or ``@dataclass``. If that is not possible then the ``serialize``
and ``deserialize`` methods should be implemented. The ``serialize`` method should return a primitive or a dict.
It does not need to serialize the values in the dict, that will be taken care of, but the keys should be of a primitive
form.
Objects that are not under control of Airflow, e.g. ``numpy.int16`` will need a registered serializer and deserializer.
Versioning is required. Primitives can be returned as can dicts. Again ``dict`` values do not need to be serialized,
but its keys need to be of primitive form. In case you are implementing a registered serializer, take special care
not to have circular imports. Typically, this can be avoided by using ``str`` for populating the list of serializers.
Like so: ``serializers = ["my.company.Foo"]`` instead of ``serializers = [Foo]``.
.. note::
Serialization and deserialization is dependent on speed. Use built-in functions like ``dict`` as much as you can and stay away from using classes and other complex structures.
Airflow Object
--------------
.. code-block:: python
from typing import Any, ClassVar
class Foo:
__version__: ClassVar[int] = 1
def __init__(self, a, v) -> None:
self.a = a
self.b = {"x": v}
def serialize(self) -> dict[str, Any]:
return {
"a": self.a,
"b": self.b,
}
@staticmethod
def deserialize(data: dict[str, Any], version: int):
f = Foo(a=data["a"])
f.b = data["b"]
return f
Registered
^^^^^^^^^^
.. code-block:: python
from __future__ import annotations
from decimal import Decimal
from typing import TYPE_CHECKING
from airflow.utils.module_loading import qualname
if TYPE_CHECKING:
from airflow.serialization.serde import U
serializers = [
Decimal
] # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers # in some cases you might not have a deserializer (e.g. k8s pod)
__version__ = 1 # required
# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, Decimal):
name = qualname(o)
_, _, exponent = o.as_tuple()
if exponent >= 0: # No digits after the decimal point.
return int(o), name, __version__, True
# Technically lossy due to floating point errors, but the best we
# can do without implementing a custom encode function.
return float(o), name, __version__, True
return "", "", 0, False
# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
# always check version compatibility
if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")
return Decimal(str(data))