blob: 181cbb488bf7630740bbaa6a46044ebc99a6e9df [file] [log] [blame]
:py:mod:`airflow.models.dagrun`
===============================
.. py:module:: airflow.models.dagrun
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.models.dagrun.TISchedulingDecision
airflow.models.dagrun.DagRun
.. py:class:: TISchedulingDecision
Bases: :py:obj:`NamedTuple`
Type of return for DagRun.task_instance_scheduling_decisions
.. py:attribute:: tis
:annotation: :List[airflow.models.taskinstance.TaskInstance]
.. py:attribute:: schedulable_tis
:annotation: :List[airflow.models.taskinstance.TaskInstance]
.. py:attribute:: changed_tis
:annotation: :bool
.. py:attribute:: unfinished_tasks
:annotation: :List[airflow.models.taskinstance.TaskInstance]
.. py:attribute:: finished_tasks
:annotation: :List[airflow.models.taskinstance.TaskInstance]
.. py:class:: DagRun(dag_id: Optional[str] = None, run_id: Optional[str] = None, queued_at: Optional[datetime.datetime] = __NO_VALUE, execution_date: Optional[datetime.datetime] = None, start_date: Optional[datetime.datetime] = None, external_trigger: Optional[bool] = None, conf: Optional[Any] = None, state: Optional[airflow.utils.state.DagRunState] = None, run_type: Optional[str] = None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None, data_interval: Optional[Tuple[datetime.datetime, datetime.datetime]] = None)
Bases: :py:obj:`airflow.models.base.Base`, :py:obj:`airflow.utils.log.logging_mixin.LoggingMixin`
DagRun describes an instance of a Dag. It can be created
by the scheduler (for regular runs) or by an external trigger
.. py:attribute:: __tablename__
:annotation: = dag_run
.. py:attribute:: id
.. py:attribute:: dag_id
.. py:attribute:: queued_at
.. py:attribute:: execution_date
.. py:attribute:: start_date
.. py:attribute:: end_date
.. py:attribute:: run_id
.. py:attribute:: creating_job_id
.. py:attribute:: external_trigger
.. py:attribute:: run_type
.. py:attribute:: conf
.. py:attribute:: data_interval_start
.. py:attribute:: data_interval_end
.. py:attribute:: last_scheduling_decision
.. py:attribute:: dag_hash
.. py:attribute:: dag
.. py:attribute:: __table_args__
.. py:attribute:: task_instances
.. py:attribute:: DEFAULT_DAGRUNS_TO_EXAMINE
.. py:method:: __repr__(self)
Return repr(self).
.. py:method:: logical_date(self) -> datetime.datetime
:property:
.. py:method:: get_state(self)
.. py:method:: set_state(self, state: airflow.utils.state.DagRunState)
.. py:method:: state(self)
:property:
.. py:method:: refresh_from_db(self, session: sqlalchemy.orm.session.Session = None)
Reloads the current dagrun from the database
:param session: database session
:type session: Session
.. py:method:: active_runs_of_dags(cls, dag_ids=None, only_running=False, session=None) -> Dict[str, int]
:classmethod:
Get the number of active dag runs for each dag.
.. py:method:: next_dagruns_to_examine(cls, state: airflow.utils.state.DagRunState, session: sqlalchemy.orm.session.Session, max_number: Optional[int] = None)
:classmethod:
Return the next DagRuns that the scheduler should attempt to schedule.
This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE"
query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as
the transaction is committed it will be unlocked.
:rtype: list[airflow.models.DagRun]
.. py:method:: find(cls, dag_id: Optional[Union[str, List[str]]] = None, run_id: Optional[str] = None, execution_date: Optional[Union[datetime.datetime, List[datetime.datetime]]] = None, state: Optional[airflow.utils.state.DagRunState] = None, external_trigger: Optional[bool] = None, no_backfills: bool = False, run_type: Optional[airflow.utils.types.DagRunType] = None, session: sqlalchemy.orm.session.Session = None, execution_start_date: Optional[datetime.datetime] = None, execution_end_date: Optional[datetime.datetime] = None) -> List[DagRun]
:classmethod:
Returns a set of dag runs for the given search criteria.
:param dag_id: the dag_id or list of dag_id to find dag runs for
:type dag_id: str or list[str]
:param run_id: defines the run id for this dag run
:type run_id: str
:param run_type: type of DagRun
:type run_type: airflow.utils.types.DagRunType
:param execution_date: the execution date
:type execution_date: datetime.datetime or list[datetime.datetime]
:param state: the state of the dag run
:type state: DagRunState
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param no_backfills: return no backfills (True), return all (False).
Defaults to False
:type no_backfills: bool
:param session: database session
:type session: sqlalchemy.orm.session.Session
:param execution_start_date: dag run that was executed from this date
:type execution_start_date: datetime.datetime
:param execution_end_date: dag run that was executed until this date
:type execution_end_date: datetime.datetime
.. py:method:: find_duplicate(cls, dag_id: str, run_id: str, execution_date: datetime.datetime, session: sqlalchemy.orm.session.Session = None) -> Optional[DagRun]
:classmethod:
Return an existing run for the DAG with a specific run_id or execution_date.
*None* is returned if no such DAG run is found.
:param dag_id: the dag_id to find duplicates for
:type dag_id: str
:param run_id: defines the run id for this dag run
:type run_id: str
:param execution_date: the execution date
:type execution_date: datetime.datetime
:param session: database session
:type session: sqlalchemy.orm.session.Session
.. py:method:: generate_run_id(run_type: airflow.utils.types.DagRunType, execution_date: datetime.datetime) -> str
:staticmethod:
Generate Run ID based on Run Type and Execution Date
.. py:method:: get_task_instances(self, state: Optional[Iterable[airflow.utils.state.TaskInstanceState]] = None, session=None) -> Iterable[airflow.models.taskinstance.TaskInstance]
Returns the task instances for this dag run
.. py:method:: get_task_instance(self, task_id: str, session: sqlalchemy.orm.session.Session = None) -> Optional[airflow.models.taskinstance.TaskInstance]
Returns the task instance specified by task_id for this dag run
:param task_id: the task id
:type task_id: str
:param session: Sqlalchemy ORM Session
:type session: Session
.. py:method:: get_dag(self) -> airflow.models.dag.DAG
Returns the Dag associated with this DagRun.
:return: DAG
.. py:method:: get_previous_dagrun(self, state: Optional[airflow.utils.state.DagRunState] = None, session: sqlalchemy.orm.session.Session = None) -> Optional[DagRun]
The previous DagRun, if there is one
.. py:method:: get_previous_scheduled_dagrun(self, session: sqlalchemy.orm.session.Session = None) -> Optional[DagRun]
The previous, SCHEDULED DagRun, if there is one
.. py:method:: update_state(self, session: sqlalchemy.orm.session.Session = None, execute_callbacks: bool = True) -> Tuple[List[airflow.models.taskinstance.TaskInstance], Optional[airflow.utils.callback_requests.DagCallbackRequest]]
Determines the overall state of the DagRun based on the state
of its TaskInstances.
:param session: Sqlalchemy ORM Session
:type session: Session
:param execute_callbacks: Should dag callbacks (success/failure, SLA etc) be invoked
directly (default: true) or recorded as a pending request in the ``callback`` property
:type execute_callbacks: bool
:return: Tuple containing tis that can be scheduled in the current loop & `callback` that
needs to be executed
.. py:method:: task_instance_scheduling_decisions(self, session: sqlalchemy.orm.session.Session = None) -> TISchedulingDecision
.. py:method:: verify_integrity(self, session: sqlalchemy.orm.session.Session = None)
Verifies the DagRun by checking for removed tasks or tasks that are not in the
database yet. It will set state to removed or add the task if required.
:param session: Sqlalchemy ORM Session
:type session: Session
.. py:method:: get_run(session: sqlalchemy.orm.session.Session, dag_id: str, execution_date: datetime.datetime) -> Optional[DagRun]
:staticmethod:
Get a single DAG Run
:meta private:
:param session: Sqlalchemy ORM Session
:type session: Session
:param dag_id: DAG ID
:type dag_id: unicode
:param execution_date: execution date
:type execution_date: datetime
:return: DagRun corresponding to the given dag_id and execution date
if one exists. None otherwise.
:rtype: airflow.models.DagRun
.. py:method:: is_backfill(self) -> bool
:property:
.. py:method:: get_latest_runs(cls, session=None) -> List[DagRun]
:classmethod:
Returns the latest DagRun for each DAG
.. py:method:: schedule_tis(self, schedulable_tis: Iterable[airflow.models.taskinstance.TaskInstance], session: sqlalchemy.orm.session.Session = None) -> int
Set the given task instances in to the scheduled state.
Each element of ``schedulable_tis`` should have it's ``task`` attribute already set.
Any DummyOperator without callbacks is instead set straight to the success state.
All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it
is the caller's responsibility to call this function only with TIs from a single dag run.