blob: 9ffd26a8bdee0379802989a16091a218cfaacfc3 [file] [log] [blame]
:py:mod:`airflow.models.dag`
============================
.. py:module:: airflow.models.dag
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.models.dag.DAG
airflow.models.dag.DagTag
airflow.models.dag.DagOwnerAttributes
airflow.models.dag.DagModel
airflow.models.dag.DagContext
Functions
~~~~~~~~~
.. autoapisummary::
airflow.models.dag.create_timetable
airflow.models.dag.get_last_dagrun
airflow.models.dag.get_dataset_triggered_next_run_info
airflow.models.dag.dag
Attributes
~~~~~~~~~~
.. autoapisummary::
airflow.models.dag.log
airflow.models.dag.DEFAULT_VIEW_PRESETS
airflow.models.dag.ORIENTATION_PRESETS
airflow.models.dag.TAG_MAX_LEN
airflow.models.dag.DagStateChangeCallback
airflow.models.dag.ScheduleInterval
airflow.models.dag.ScheduleIntervalArg
airflow.models.dag.ScheduleArg
airflow.models.dag.SLAMissCallback
airflow.models.dag.DEFAULT_SCHEDULE_INTERVAL
.. py:data:: log
.. py:data:: DEFAULT_VIEW_PRESETS
:annotation: = ['grid', 'graph', 'duration', 'gantt', 'landing_times']
.. py:data:: ORIENTATION_PRESETS
:annotation: = ['LR', 'TB', 'RL', 'BT']
.. py:data:: TAG_MAX_LEN
:annotation: = 100
.. py:data:: DagStateChangeCallback
.. py:data:: ScheduleInterval
.. py:data:: ScheduleIntervalArg
.. py:data:: ScheduleArg
.. py:data:: SLAMissCallback
.. py:data:: DEFAULT_SCHEDULE_INTERVAL
.. py:exception:: InconsistentDataInterval(instance, start_field_name, end_field_name)
Bases: :py:obj:`airflow.exceptions.AirflowException`
Exception raised when a model populates data interval fields incorrectly.
The data interval fields should either both be None (for runs scheduled
prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is
implemented). This is raised if exactly one of the fields is None.
.. py:method:: __str__()
Return str(self).
.. py:function:: create_timetable(interval, timezone)
Create a Timetable instance from a ``schedule_interval`` argument.
.. py:function:: get_last_dagrun(dag_id, session, include_externally_triggered=False)
Returns the last dag run for a dag, None if there was none.
Last dag run can be any type of run eg. scheduled or backfilled.
Overridden DagRuns are ignored.
.. py:function:: get_dataset_triggered_next_run_info(dag_ids, *, session)
Given a list of dag_ids, get string representing how close any that are dataset triggered are
their next run, e.g. "1 of 2 datasets updated"
.. py:class:: DAG(dag_id, description = None, schedule = NOTSET, schedule_interval = NOTSET, timetable = None, start_date = None, end_date = None, full_filepath = None, template_searchpath = None, template_undefined = jinja2.StrictUndefined, user_defined_macros = None, user_defined_filters = None, default_args = None, concurrency = None, max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout = None, sla_miss_callback = None, default_view = conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation = conf.get_mandatory_value('webserver', 'dag_orientation'), catchup = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback = None, on_failure_callback = None, doc_md = None, params = None, access_control = None, is_paused_upon_creation = None, jinja_environment_kwargs = None, render_template_as_native_obj = False, tags = None, owner_links = None, auto_register = True)
Bases: :py:obj:`airflow.utils.log.logging_mixin.LoggingMixin`
A dag (directed acyclic graph) is a collection of tasks with directional
dependencies. A dag also has a schedule, a start date and an end date
(optional). For each schedule, (say daily or hourly), the DAG needs to run
each individual tasks as their dependencies are met. Certain tasks have
the property of depending on their own past, meaning that they can't run
until their previous schedule (and upstream tasks) are completed.
DAGs essentially act as namespaces for tasks. A task_id can only be
added once to a DAG.
Note that if you plan to use time zones all the dates provided should be pendulum
dates. See :ref:`timezone_aware_dags`.
.. versionadded:: 2.4
The *schedule* argument to specify either time-based scheduling logic
(timetable), or dataset-driven triggers.
.. deprecated:: 2.4
The arguments *schedule_interval* and *timetable*. Their functionalities
are merged into the new *schedule* argument.
:param dag_id: The id of the DAG; must consist exclusively of alphanumeric
characters, dashes, dots and underscores (all ASCII)
:param description: The description for the DAG to e.g. be shown on the webserver
:param schedule: Defines the rules according to which DAG runs are scheduled. Can
accept cron string, timedelta object, Timetable, or list of Dataset objects.
See also :doc:`/howto/timetable`.
:param start_date: The timestamp from which the scheduler will
attempt to backfill
:param end_date: A date beyond which your DAG won't run, leave to None
for open ended scheduling
:param template_searchpath: This list of folders (non relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:param template_undefined: Template undefined type.
:param user_defined_macros: a dictionary of macros that will be exposed
in your jinja templates. For example, passing ``dict(foo='bar')``
to this argument allows you to ``{{ foo }}`` in all jinja
templates related to this DAG. Note that you can pass any
type of object here.
:param user_defined_filters: a dictionary of filters that will be exposed
in your jinja templates. For example, passing
``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows
you to ``{{ 'world' | hello }}`` in all jinja templates related to
this DAG.
:param default_args: A dictionary of default parameters to be used
as constructor keyword parameters when initialising operators.
Note that operators have the same hook, and precede those defined
here, meaning that if your dict contains `'depends_on_past': True`
here and `'depends_on_past': False` in the operator's call
`default_args`, the actual value will be `False`.
:param params: a dictionary of DAG level parameters that are made
accessible in templates, namespaced under `params`. These
params can be overridden at the task level.
:param max_active_tasks: the number of task instances allowed to run
concurrently
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created. The timeout
is only enforced for scheduled DagRuns.
:param sla_miss_callback: specify a function to call when reporting SLA
timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for
more information about the function signature and parameters that are
passed to the callback.
:param default_view: Specify DAG default view (grid, graph, duration,
gantt, landing_times), default grid
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
:param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
:param on_failure_callback: A function to be called when a DagRun of this dag fails.
A context dictionary is passed as a single parameter to this function.
:param on_success_callback: Much like the ``on_failure_callback`` except
that it is executed when the dag succeeds.
:param access_control: Specify optional DAG-level actions, e.g.,
"{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}"
:param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
If the dag exists already, this flag will be ignored. If this optional parameter
is not specified, the global config setting will be used.
:param jinja_environment_kwargs: additional configuration options to be passed to Jinja
``Environment`` for template rendering
**Example**: to avoid Jinja from removing a trailing newline from template strings ::
DAG(dag_id='my-dag',
jinja_environment_kwargs={
'keep_trailing_newline': True,
# some other jinja2 Environment options here
}
)
**See**: `Jinja Environment documentation
<https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_
:param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment``
to render templates as native Python types. If False, a Jinja
``Environment`` is used to render templates as string values.
:param tags: List of tags to help filtering DAGs in the UI.
:param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI.
Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
e.g: {"dag_owner": "https://airflow.apache.org/"}
:param auto_register: Automatically register this DAG when it is used in a ``with`` block
.. py:attribute:: fileloc
:annotation: :str
File path that needs to be imported to load this DAG or subdag.
This may not be an actual file on disk in the case when this DAG is loaded
from a ZIP file or other DAG distribution format.
.. py:attribute:: parent_dag
:annotation: :DAG | None
.. py:method:: get_doc_md(doc_md)
.. py:method:: validate()
Validate the DAG has a coherent setup.
This is called by the DAG bag before bagging the DAG.
.. py:method:: __repr__()
Return repr(self).
.. py:method:: __eq__(other)
Return self==value.
.. py:method:: __ne__(other)
Return self!=value.
.. py:method:: __lt__(other)
Return self<value.
.. py:method:: __hash__()
Return hash(self).
.. py:method:: __enter__()
.. py:method:: __exit__(_type, _value, _tb)
.. py:method:: date_range(start_date, num = None, end_date = None)
.. py:method:: is_fixed_time_schedule()
.. py:method:: following_schedule(dttm)
Calculates the following schedule for this dag in UTC.
:param dttm: utc datetime
:return: utc datetime
.. py:method:: previous_schedule(dttm)
.. py:method:: get_next_data_interval(dag_model)
Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG's
schedule if the run does not have an explicit one set, which is possible
for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a
part of the Python API.
:meta private:
.. py:method:: get_run_data_interval(run)
Get the data interval of this run.
For compatibility, this method infers the data interval from the DAG's
schedule if the run does not have an explicit one set, which is possible for
runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a
part of the Python API.
:meta private:
.. py:method:: infer_automated_data_interval(logical_date)
Infer a data interval for a run against this DAG.
This method is used to bridge runs created prior to AIP-39
implementation, which do not have an explicit data interval. Therefore,
this method only considers ``schedule_interval`` values valid prior to
Airflow 2.2.
DO NOT use this method is there is a known data interval.
.. py:method:: next_dagrun_info(last_automated_dagrun, *, restricted = True)
Get information about the next DagRun of this dag after ``date_last_automated_dagrun``.
This calculates what time interval the next DagRun should operate on
(its execution date) and when it can be scheduled, according to the
dag's timetable, start_date, end_date, etc. This doesn't check max
active run or any other "max_active_tasks" type limits, but only
performs calculations based on the various date and interval fields of
this dag and its tasks.
:param last_automated_dagrun: The ``max(execution_date)`` of
existing "automated" DagRuns for this dag (scheduled or backfill,
but not manual).
:param restricted: If set to *False* (default is *True*), ignore
``start_date``, ``end_date``, and ``catchup`` specified on the DAG
or tasks.
:return: DagRunInfo of the next dagrun, or None if a dagrun is not
going to be scheduled.
.. py:method:: next_dagrun_after_date(date_last_automated_dagrun)
.. py:method:: iter_dagrun_infos_between(earliest, latest, *, align = True)
Yield DagRunInfo using this DAG's timetable between given interval.
DagRunInfo instances yielded if their ``logical_date`` is not earlier
than ``earliest``, nor later than ``latest``. The instances are ordered
by their ``logical_date`` from earliest to latest.
If ``align`` is ``False``, the first run will happen immediately on
``earliest``, even if it does not fall on the logical timetable schedule.
The default is ``True``, but subdags will ignore this value and always
behave as if this is set to ``False`` for backward compatibility.
Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
if ``align=True``.
.. py:method:: get_run_dates(start_date, end_date=None)
Returns a list of dates between the interval received as parameter using this
dag's schedule interval. Returned dates can be used for execution dates.
:param start_date: The start date of the interval.
:param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``.
:return: A list of dates within the interval following the dag's schedule.
:rtype: list
.. py:method:: normalize_schedule(dttm)
.. py:method:: get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)
.. py:method:: has_dag_runs(session=NEW_SESSION, include_externally_triggered=True)
.. py:method:: dag_id()
:property:
.. py:method:: is_subdag()
:property:
.. py:method:: full_filepath()
:property:
:meta private:
.. py:method:: concurrency()
:property:
.. py:method:: max_active_tasks()
:property:
.. py:method:: access_control()
:property:
.. py:method:: description()
:property:
.. py:method:: default_view()
:property:
.. py:method:: pickle_id()
:property:
.. py:method:: param(name, default = NOTSET)
Return a DagParam object for current dag.
:param name: dag parameter name.
:param default: fallback value for dag parameter.
:return: DagParam instance for specified name and current dag.
.. py:method:: tasks()
:property:
.. py:method:: task_ids()
:property:
.. py:method:: task_group()
:property:
.. py:method:: filepath()
:property:
:meta private:
.. py:method:: relative_fileloc()
:property:
File location of the importable dag 'file' relative to the configured DAGs folder.
.. py:method:: folder()
:property:
Folder location of where the DAG object is instantiated.
.. py:method:: owner()
:property:
Return list of all owners found in DAG tasks.
:return: Comma separated list of owners in DAG tasks
:rtype: str
.. py:method:: allow_future_exec_dates()
:property:
.. py:method:: get_concurrency_reached(session=NEW_SESSION)
Returns a boolean indicating whether the max_active_tasks limit for this DAG
has been reached
.. py:method:: concurrency_reached()
:property:
This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.
.. py:method:: get_is_active(session=NEW_SESSION)
Returns a boolean indicating whether this DAG is active
.. py:method:: get_is_paused(session=NEW_SESSION)
Returns a boolean indicating whether this DAG is paused
.. py:method:: is_paused()
:property:
This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method.
.. py:method:: normalized_schedule_interval()
:property:
.. py:method:: handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)
Triggers the appropriate callback depending on the value of success, namely the
on_failure_callback or on_success_callback. This method gets the context of a
single TaskInstance part of this DagRun and passes that to the callable along
with a 'reason', primarily to differentiate DagRun failures.
.. note: The logs end up in
``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log``
:param dagrun: DagRun object
:param success: Flag to specify if failure or success callback should be called
:param reason: Completion reason
:param session: Database session
.. py:method:: get_active_runs()
Returns a list of dag run execution dates currently running
:return: List of execution dates
.. py:method:: get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)
Returns the number of active "running" dag runs
:param external_trigger: True for externally triggered active dag runs
:param session:
:return: number greater than 0 for active dag runs
.. py:method:: get_dagrun(execution_date = None, run_id = None, session = NEW_SESSION)
Returns the dag run for a given execution date or run_id if it exists, otherwise
none.
:param execution_date: The execution date of the DagRun to find.
:param run_id: The run_id of the DagRun to find.
:param session:
:return: The DagRun if found, otherwise None.
.. py:method:: get_dagruns_between(start_date, end_date, session=NEW_SESSION)
Returns the list of dag runs between start_date (inclusive) and end_date (inclusive).
:param start_date: The starting execution date of the DagRun to find.
:param end_date: The ending execution date of the DagRun to find.
:param session:
:return: The list of DagRuns found.
.. py:method:: get_latest_execution_date(session = NEW_SESSION)
Returns the latest date for which at least one dag run exists
.. py:method:: latest_execution_date()
:property:
This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.
.. py:method:: subdags()
:property:
Returns a list of the subdag objects associated to this DAG
.. py:method:: resolve_template_files()
.. py:method:: get_template_env(*, force_sandboxed = False)
Build a Jinja2 environment.
.. py:method:: set_dependency(upstream_task_id, downstream_task_id)
Simple utility method to set dependency between two tasks that
already have been added to the DAG using add_task()
.. py:method:: get_task_instances_before(base_date, num, *, session = NEW_SESSION)
Get ``num`` task instances before (including) ``base_date``.
The returned list may contain exactly ``num`` task instances. It can
have less if there are less than ``num`` scheduled DAG runs before
``base_date``, or more if there are manual task runs between the
requested period, which does not count toward ``num``.
.. py:method:: get_task_instances(start_date = None, end_date = None, state = None, session = NEW_SESSION)
.. py:method:: set_task_instance_state(*, task_id, map_indexes = None, execution_date = None, run_id = None, state, upstream = False, downstream = False, future = False, past = False, commit = True, session=NEW_SESSION)
Set the state of a TaskInstance to the given state, and clear its downstream tasks that are
in failed or upstream_failed state.
:param task_id: Task ID of the TaskInstance
:param map_indexes: Only set TaskInstance if its map_index matches.
If None (default), all mapped TaskInstances of the task are set.
:param execution_date: Execution date of the TaskInstance
:param run_id: The run_id of the TaskInstance
:param state: State to set the TaskInstance to
:param upstream: Include all upstream tasks of the given task_id
:param downstream: Include all downstream tasks of the given task_id
:param future: Include all future TaskInstances of the given task_id
:param commit: Commit changes
:param past: Include all past TaskInstances of the given task_id
.. py:method:: roots()
:property:
Return nodes with no parents. These are first to execute and are called roots or root nodes.
.. py:method:: leaves()
:property:
Return nodes with no children. These are last to execute and are called leaves or leaf nodes.
.. py:method:: topological_sort(include_subdag_tasks = False)
Sorts tasks in topographical order, such that a task comes after any of its
upstream dependencies.
Deprecated in place of ``task_group.topological_sort``
.. py:method:: set_dag_runs_state(state = State.RUNNING, session = NEW_SESSION, start_date = None, end_date = None, dag_ids = [])
.. py:method:: clear(task_ids = None, start_date = None, end_date = None, only_failed = False, only_running = False, confirm_prompt = False, include_subdags = True, include_parentdag = True, dag_run_state = DagRunState.QUEUED, dry_run = False, session = NEW_SESSION, get_tis = False, recursion_depth = 0, max_recursion_depth = None, dag_bag = None, exclude_task_ids = frozenset())
Clears a set of task instances associated with the current dag for
a specified date range.
:param task_ids: List of task ids or (``task_id``, ``map_index``) tuples to clear
:param start_date: The minimum execution_date to clear
:param end_date: The maximum execution_date to clear
:param only_failed: Only clear failed tasks
:param only_running: Only clear running tasks.
:param confirm_prompt: Ask for confirmation
:param include_subdags: Clear tasks in subdags and clear external tasks
indicated by ExternalTaskMarker
:param include_parentdag: Clear tasks in the parent dag of the subdag.
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dry_run: Find the tasks to clear but don't clear them.
:param session: The sqlalchemy session to use
:param dag_bag: The DagBag used to find the dags subdags (Optional)
:param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``)
tuples that should not be cleared
.. py:method:: clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)
:classmethod:
.. py:method:: __deepcopy__(memo)
.. py:method:: sub_dag(*args, **kwargs)
This method is deprecated in favor of partial_subset
.. py:method:: partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)
Returns a subset of the current dag as a deep copy of the current dag
based on a regex that should match one or many tasks, and includes
upstream and downstream neighbours based on the flag passed.
:param task_ids_or_regex: Either a list of task_ids, or a regex to
match against task ids (as a string, or compiled regex pattern).
:param include_downstream: Include all downstream tasks of matched
tasks, in addition to matched tasks.
:param include_upstream: Include all upstream tasks of matched tasks,
in addition to matched tasks.
:param include_direct_upstream: Include all tasks directly upstream of matched
and downstream (if include_downstream = True) tasks
.. py:method:: has_task(task_id)
.. py:method:: has_task_group(task_group_id)
.. py:method:: task_group_dict()
.. py:method:: get_task(task_id, include_subdags = False)
.. py:method:: pickle_info()
.. py:method:: pickle(session=NEW_SESSION)
.. py:method:: tree_view()
Print an ASCII tree representation of the DAG.
.. py:method:: task()
:property:
.. py:method:: add_task(task)
Add a task to the DAG
:param task: the task you want to add
.. py:method:: add_tasks(tasks)
Add a list of tasks to the DAG
:param tasks: a lit of tasks you want to add
.. py:method:: run(start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False)
Runs the DAG.
:param start_date: the start date of the range to run
:param end_date: the end date of the range to run
:param mark_success: True to mark jobs as succeeded without running them
:param local: True to run the tasks using the LocalExecutor
:param executor: The executor instance to run the tasks
:param donot_pickle: True to avoid pickling DAG object and send to workers
:param ignore_task_deps: True to skip upstream tasks
:param ignore_first_depends_on_past: True to ignore depends_on_past
dependencies for the first set of tasks only
:param pool: Resource pool to use
:param delay_on_limit_secs: Time in seconds to wait before next attempt to run
dag run when max_active_runs limit has been reached
:param verbose: Make logging output more verbose
:param conf: user defined dictionary passed from CLI
:param rerun_failed_tasks:
:param run_backwards:
:param run_at_least_once: If true, always run the DAG at least once even
if no logical run exists within the time range.
.. py:method:: cli()
Exposes a CLI specific to this DAG
.. py:method:: create_dagrun(state, execution_date = None, run_id = None, start_date = None, external_trigger = False, conf = None, run_type = None, session=NEW_SESSION, dag_hash = None, creating_job_id = None, data_interval = None)
Creates a dag run from this dag including the tasks associated with this dag.
Returns the dag run.
:param run_id: defines the run id for this dag run
:param run_type: type of DagRun
:param execution_date: the execution date of this dag run
:param state: the state of the dag run
:param start_date: the date this dag run should be evaluated
:param external_trigger: whether this dag run is externally triggered
:param conf: Dict containing configuration/parameters to pass to the DAG
:param creating_job_id: id of the job creating this DagRun
:param session: database session
:param dag_hash: Hash of Serialized DAG
:param data_interval: Data interval of the DagRun
.. py:method:: bulk_sync_to_db(dags, session=NEW_SESSION)
:classmethod:
This method is deprecated in favor of bulk_write_to_db
.. py:method:: bulk_write_to_db(dags, processor_subdir = None, session=NEW_SESSION)
:classmethod:
Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including
calculated fields.
Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
:param dags: the DAG objects to save to the DB
:return: None
.. py:method:: sync_to_db(processor_subdir = None, session=NEW_SESSION)
Save attributes about this DAG to the DB. Note that this method
can be called for both DAGs and SubDAGs. A SubDag is actually a
SubDagOperator.
:return: None
.. py:method:: get_default_view()
This is only there for backward compatible jinja2 templates
.. py:method:: deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)
:staticmethod:
Given a list of known DAGs, deactivate any other DAGs that are
marked as active in the ORM
:param active_dag_ids: list of DAG IDs that are active
:return: None
.. py:method:: deactivate_stale_dags(expiration_date, session=NEW_SESSION)
:staticmethod:
Deactivate any DAGs that were last touched by the scheduler before
the expiration date. These DAGs were likely deleted.
:param expiration_date: set inactive DAGs that were touched before this
time
:return: None
.. py:method:: get_num_task_instances(dag_id, task_ids=None, states=None, session=NEW_SESSION)
:staticmethod:
Returns the number of task instances in the given DAG.
:param session: ORM session
:param dag_id: ID of the DAG to get the task concurrency of
:param task_ids: A list of valid task IDs for the given DAG
:param states: A list of states to filter by if supplied
:return: The number of running tasks
:rtype: int
.. py:method:: get_serialized_fields()
:classmethod:
Stringified DAGs and operators contain exactly these fields.
.. py:method:: get_edge_info(upstream_task_id, downstream_task_id)
Returns edge information for the given pair of tasks if present, and
an empty edge if there is no information.
.. py:method:: set_edge_info(upstream_task_id, downstream_task_id, info)
Sets the given edge information on the DAG. Note that this will overwrite,
rather than merge with, existing info.
.. py:method:: validate_schedule_and_params()
Validates & raise exception if there are any Params in the DAG which neither have a default value nor
have the null in schema['type'] list, but the DAG have a schedule_interval which is not None.
.. py:method:: iter_invalid_owner_links()
Parses a given link, and verifies if it's a valid URL, or a 'mailto' link.
Returns an iterator of invalid (owner, link) pairs.
.. py:class:: DagTag
Bases: :py:obj:`airflow.models.base.Base`
A tag name per dag, to allow quick filtering in the DAG view.
.. py:attribute:: __tablename__
:annotation: = dag_tag
.. py:attribute:: name
.. py:attribute:: dag_id
.. py:method:: __repr__()
.. py:class:: DagOwnerAttributes
Bases: :py:obj:`airflow.models.base.Base`
Table defining different owner attributes. For example, a link for an owner that will be passed as
a hyperlink to the DAGs view
.. py:attribute:: __tablename__
:annotation: = dag_owner_attributes
.. py:attribute:: dag_id
.. py:attribute:: owner
.. py:attribute:: link
.. py:method:: __repr__()
.. py:method:: get_all(session)
:classmethod:
.. py:class:: DagModel(concurrency=None, **kwargs)
Bases: :py:obj:`airflow.models.base.Base`
Table containing DAG properties
.. py:attribute:: __tablename__
:annotation: = dag
These items are stored in the database for state related information
.. py:attribute:: dag_id
.. py:attribute:: root_dag_id
.. py:attribute:: is_paused_at_creation
.. py:attribute:: is_paused
.. py:attribute:: is_subdag
.. py:attribute:: is_active
.. py:attribute:: last_parsed_time
.. py:attribute:: last_pickled
.. py:attribute:: last_expired
.. py:attribute:: scheduler_lock
.. py:attribute:: pickle_id
.. py:attribute:: fileloc
.. py:attribute:: processor_subdir
.. py:attribute:: owners
.. py:attribute:: description
.. py:attribute:: default_view
.. py:attribute:: schedule_interval
.. py:attribute:: timetable_description
.. py:attribute:: tags
.. py:attribute:: dag_owner_links
.. py:attribute:: max_active_tasks
.. py:attribute:: max_active_runs
.. py:attribute:: has_task_concurrency_limits
.. py:attribute:: has_import_errors
.. py:attribute:: next_dagrun
.. py:attribute:: next_dagrun_data_interval_start
.. py:attribute:: next_dagrun_data_interval_end
.. py:attribute:: next_dagrun_create_after
.. py:attribute:: __table_args__
.. py:attribute:: parent_dag
.. py:attribute:: schedule_dataset_references
.. py:attribute:: schedule_datasets
.. py:attribute:: task_outlet_dataset_references
.. py:attribute:: NUM_DAGS_PER_DAGRUN_QUERY
.. py:method:: __repr__()
.. py:method:: next_dagrun_data_interval()
:property:
.. py:method:: timezone()
:property:
.. py:method:: get_dagmodel(dag_id, session=NEW_SESSION)
:staticmethod:
.. py:method:: get_current(dag_id, session=NEW_SESSION)
:classmethod:
.. py:method:: get_all_paused_dag_ids(session = NEW_SESSION)
:staticmethod:
Get a set of paused DAG ids
.. py:method:: get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)
.. py:method:: get_is_paused(*, session = None)
Provide interface compatibility to 'DAG'.
.. py:method:: get_paused_dag_ids(dag_ids, session = NEW_SESSION)
:staticmethod:
Given a list of dag_ids, get a set of Paused Dag Ids
:param dag_ids: List of Dag ids
:param session: ORM Session
:return: Paused Dag_ids
.. py:method:: get_default_view()
Get the Default DAG View, returns the default config value if DagModel does not
have a value
.. py:method:: safe_dag_id()
:property:
.. py:method:: relative_fileloc()
:property:
File location of the importable dag 'file' relative to the configured DAGs folder.
.. py:method:: set_is_paused(is_paused, including_subdags = True, session=NEW_SESSION)
Pause/Un-pause a DAG.
:param is_paused: Is the DAG paused
:param including_subdags: whether to include the DAG's subdags
:param session: session
.. py:method:: deactivate_deleted_dags(alive_dag_filelocs, session=NEW_SESSION)
:classmethod:
Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
:param alive_dag_filelocs: file paths of alive DAGs
:param session: ORM Session
.. py:method:: dags_needing_dagruns(session)
:classmethod:
Return (and lock) a list of Dag objects that are due to create a new DagRun.
This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
transaction is committed it will be unlocked.
.. py:method:: calculate_dagrun_date_fields(dag, most_recent_dag_run)
Calculate ``next_dagrun`` and `next_dagrun_create_after``
:param dag: The DAG object
:param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
if not yet scheduled.
.. py:method:: get_dataset_triggered_next_run_info(*, session=NEW_SESSION)
.. py:function:: dag(dag_id = '', description = None, schedule = NOTSET, schedule_interval = NOTSET, timetable = None, start_date = None, end_date = None, full_filepath = None, template_searchpath = None, template_undefined = jinja2.StrictUndefined, user_defined_macros = None, user_defined_filters = None, default_args = None, concurrency = None, max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout = None, sla_miss_callback = None, default_view = conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation = conf.get_mandatory_value('webserver', 'dag_orientation'), catchup = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback = None, on_failure_callback = None, doc_md = None, params = None, access_control = None, is_paused_upon_creation = None, jinja_environment_kwargs = None, render_template_as_native_obj = False, tags = None, owner_links = None, auto_register = True)
Python dag decorator. Wraps a function into an Airflow DAG.
Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
:param dag_args: Arguments for DAG object
:param dag_kwargs: Kwargs for DAG object.
.. py:class:: DagContext
DAG context is used to keep the current DAG when DAG is used as ContextManager.
You can use DAG as context:
.. code-block:: python
with DAG(
dag_id="example_dag",
default_args=default_args,
schedule="0 0 * * *",
dagrun_timeout=timedelta(minutes=60),
) as dag:
...
If you do this the context stores the DAG and whenever new task is created, it will use
such stored DAG as the parent DAG.
.. py:attribute:: autoregistered_dags
:annotation: :set[tuple[DAG, types.ModuleType]]
.. py:attribute:: current_autoregister_module_name
:annotation: :str | None
.. py:method:: push_context_managed_dag(dag)
:classmethod:
.. py:method:: pop_context_managed_dag()
:classmethod:
.. py:method:: get_current_dag()
:classmethod: