blob: dff3e6ee9e309facd738c9eeada27d41c3f766c5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, NamedTuple, Sequence
from pendulum import DateTime
from airflow.typing_compat import Protocol, runtime_checkable
if TYPE_CHECKING:
from airflow.utils.types import DagRunType
class DataInterval(NamedTuple):
"""A data interval for a DagRun to operate over.
Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone
information.
"""
start: DateTime
end: DateTime
@classmethod
def exact(cls, at: DateTime) -> DataInterval:
"""Represent an "interval" containing only an exact time."""
return cls(start=at, end=at)
class TimeRestriction(NamedTuple):
"""Restriction on when a DAG can be scheduled for a run.
Specifically, the run must not be earlier than ``earliest``, nor later than
``latest``. If ``catchup`` is *False*, the run must also not be earlier than
the current time, i.e. "missed" schedules are not backfilled.
These values are generally set on the DAG or task's ``start_date``,
``end_date``, and ``catchup`` arguments.
Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run
can happen exactly at either point of time. They are guaranteed to be aware
(i.e. contain timezone information) for ``TimeRestriction`` instances
created by Airflow.
"""
earliest: DateTime | None
latest: DateTime | None
catchup: bool
class DagRunInfo(NamedTuple):
"""Information to schedule a DagRun.
Instances of this will be returned by timetables when they are asked to
schedule a DagRun creation.
"""
run_after: DateTime
"""The earliest time this DagRun is created and its tasks scheduled.
This **MUST** be "aware", i.e. contain timezone information.
"""
data_interval: DataInterval
"""The data interval this DagRun to operate over."""
@classmethod
def exact(cls, at: DateTime) -> DagRunInfo:
"""Represent a run on an exact time."""
return cls(run_after=at, data_interval=DataInterval.exact(at))
@classmethod
def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
"""Represent a run on a continuous schedule.
In such a schedule, each data interval starts right after the previous
one ends, and each run is scheduled right after the interval ends. This
applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
"""
return cls(run_after=end, data_interval=DataInterval(start, end))
@property
def logical_date(self: DagRunInfo) -> DateTime:
"""Infer the logical date to represent a DagRun.
This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
essentially the same, just a different name.
"""
return self.data_interval.start
@runtime_checkable
class Timetable(Protocol):
"""Protocol that all Timetable classes are expected to implement."""
description: str = ""
"""Human-readable description of the timetable.
For example, this can produce something like ``'At 21:30, only on Friday'``
from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
"""
periodic: bool = True
"""Whether this timetable runs periodically.
This defaults to and should generally be *True*, but some special setups
like ``schedule=None`` and ``"@once"`` set it to *False*.
"""
can_run: bool = True
"""Whether this timetable can actually schedule runs.
This defaults to and should generally be *True*, but ``NullTimetable`` sets
this to *False*.
"""
run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
"""How runs triggered from this timetable should be ordered in UI.
This should be a list of field names on the DAG run object.
"""
active_runs_limit: int | None = None
"""Override the max_active_runs parameter of any DAGs using this timetable.
This is called during DAG initializing, and will set the max_active_runs if
it returns a value. In most cases this should return None, but in some cases
(for example, the ContinuousTimetable) there are good reasons for limiting
the DAGRun parallelism.
"""
@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
"""Deserialize a timetable from data.
This is called when a serialized DAG is deserialized. ``data`` will be
whatever was returned by ``serialize`` during DAG serialization. The
default implementation constructs the timetable without any arguments.
"""
return cls()
def serialize(self) -> dict[str, Any]:
"""Serialize the timetable for JSON encoding.
This is called during DAG serialization to store timetable information
in the database. This should return a JSON-serializable dict that will
be fed into ``deserialize`` when the DAG is deserialized. The default
implementation returns an empty dict.
"""
return {}
def validate(self) -> None:
"""Validate the timetable is correctly specified.
Override this method to provide run-time validation raised when a DAG
is put into a dagbag. The default implementation does nothing.
:raises: AirflowTimetableInvalid on validation failure.
"""
return
@property
def summary(self) -> str:
"""A short summary for the timetable.
This is used to display the timetable in the web UI. A cron expression
timetable, for example, can use this to display the expression. The
default implementation returns the timetable's type name.
"""
return type(self).__name__
def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
"""When a DAG run is manually triggered, infer a data interval for it.
This is used for e.g. manually-triggered runs, where ``run_after`` would
be when the user triggers the run. The default implementation raises
``NotImplementedError``.
"""
raise NotImplementedError()
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
"""Provide information to schedule the next DagRun.
The default implementation raises ``NotImplementedError``.
:param last_automated_data_interval: The data interval of the associated
DAG's last scheduled or backfilled run (manual runs not considered).
:param restriction: Restriction to apply when scheduling the DAG run.
See documentation of :class:`TimeRestriction` for details.
:return: Information on when the next DagRun can be scheduled. None
means a DagRun will not happen. This does not mean no more runs
will be scheduled even again for this DAG; the timetable can return
a DagRunInfo object when asked at another time.
"""
raise NotImplementedError()
def generate_run_id(
self,
*,
run_type: DagRunType,
logical_date: DateTime,
data_interval: DataInterval | None,
**extra,
) -> str:
return run_type.generate_run_id(logical_date)