| :py:mod:`airflow.models.dag` |
| ============================ |
| |
| .. py:module:: airflow.models.dag |
| |
| |
| Module Contents |
| --------------- |
| |
| Classes |
| ~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.models.dag.DAG |
| airflow.models.dag.DagTag |
| airflow.models.dag.DagOwnerAttributes |
| airflow.models.dag.DagModel |
| airflow.models.dag.DagContext |
| |
| |
| |
| Functions |
| ~~~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.models.dag.create_timetable |
| airflow.models.dag.get_last_dagrun |
| airflow.models.dag.get_dataset_triggered_next_run_info |
| airflow.models.dag.dag |
| |
| |
| |
| Attributes |
| ~~~~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.models.dag.log |
| airflow.models.dag.DEFAULT_VIEW_PRESETS |
| airflow.models.dag.ORIENTATION_PRESETS |
| airflow.models.dag.TAG_MAX_LEN |
| airflow.models.dag.DagStateChangeCallback |
| airflow.models.dag.ScheduleInterval |
| airflow.models.dag.ScheduleIntervalArg |
| airflow.models.dag.ScheduleArg |
| airflow.models.dag.SLAMissCallback |
| airflow.models.dag.DEFAULT_SCHEDULE_INTERVAL |
| |
| |
| .. py:data:: log |
| |
| |
| |
| |
| .. py:data:: DEFAULT_VIEW_PRESETS |
| :annotation: = ['grid', 'graph', 'duration', 'gantt', 'landing_times'] |
| |
| |
| |
| .. py:data:: ORIENTATION_PRESETS |
| :annotation: = ['LR', 'TB', 'RL', 'BT'] |
| |
| |
| |
| .. py:data:: TAG_MAX_LEN |
| :annotation: = 100 |
| |
| |
| |
| .. py:data:: DagStateChangeCallback |
| |
| |
| |
| |
| .. py:data:: ScheduleInterval |
| |
| |
| |
| |
| .. py:data:: ScheduleIntervalArg |
| |
| |
| |
| |
| .. py:data:: ScheduleArg |
| |
| |
| |
| |
| .. py:data:: SLAMissCallback |
| |
| |
| |
| |
| .. py:data:: DEFAULT_SCHEDULE_INTERVAL |
| |
| |
| |
| |
| .. py:exception:: InconsistentDataInterval(instance, start_field_name, end_field_name) |
| |
| Bases: :py:obj:`airflow.exceptions.AirflowException` |
| |
| Exception raised when a model populates data interval fields incorrectly. |
| |
| The data interval fields should either both be None (for runs scheduled |
| prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is |
| implemented). This is raised if exactly one of the fields is None. |
| |
| .. py:method:: __str__() |
| |
| Return str(self). |
| |
| |
| |
| .. py:function:: create_timetable(interval, timezone) |
| |
| Create a Timetable instance from a ``schedule_interval`` argument. |
| |
| |
| .. py:function:: get_last_dagrun(dag_id, session, include_externally_triggered=False) |
| |
| Returns the last dag run for a dag, None if there was none. |
| Last dag run can be any type of run eg. scheduled or backfilled. |
| Overridden DagRuns are ignored. |
| |
| |
| .. py:function:: get_dataset_triggered_next_run_info(dag_ids, *, session) |
| |
| Given a list of dag_ids, get string representing how close any that are dataset triggered are |
| their next run, e.g. "1 of 2 datasets updated" |
| |
| |
| .. py:class:: DAG(dag_id, description = None, schedule = NOTSET, schedule_interval = NOTSET, timetable = None, start_date = None, end_date = None, full_filepath = None, template_searchpath = None, template_undefined = jinja2.StrictUndefined, user_defined_macros = None, user_defined_filters = None, default_args = None, concurrency = None, max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout = None, sla_miss_callback = None, default_view = conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation = conf.get_mandatory_value('webserver', 'dag_orientation'), catchup = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback = None, on_failure_callback = None, doc_md = None, params = None, access_control = None, is_paused_upon_creation = None, jinja_environment_kwargs = None, render_template_as_native_obj = False, tags = None, owner_links = None, auto_register = True) |
| |
| Bases: :py:obj:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| A dag (directed acyclic graph) is a collection of tasks with directional |
| dependencies. A dag also has a schedule, a start date and an end date |
| (optional). For each schedule, (say daily or hourly), the DAG needs to run |
| each individual tasks as their dependencies are met. Certain tasks have |
| the property of depending on their own past, meaning that they can't run |
| until their previous schedule (and upstream tasks) are completed. |
| |
| DAGs essentially act as namespaces for tasks. A task_id can only be |
| added once to a DAG. |
| |
| Note that if you plan to use time zones all the dates provided should be pendulum |
| dates. See :ref:`timezone_aware_dags`. |
| |
| .. versionadded:: 2.4 |
| The *schedule* argument to specify either time-based scheduling logic |
| (timetable), or dataset-driven triggers. |
| |
| .. deprecated:: 2.4 |
| The arguments *schedule_interval* and *timetable*. Their functionalities |
| are merged into the new *schedule* argument. |
| |
| :param dag_id: The id of the DAG; must consist exclusively of alphanumeric |
| characters, dashes, dots and underscores (all ASCII) |
| :param description: The description for the DAG to e.g. be shown on the webserver |
| :param schedule: Defines the rules according to which DAG runs are scheduled. Can |
| accept cron string, timedelta object, Timetable, or list of Dataset objects. |
| See also :doc:`/howto/timetable`. |
| :param start_date: The timestamp from which the scheduler will |
| attempt to backfill |
| :param end_date: A date beyond which your DAG won't run, leave to None |
| for open ended scheduling |
| :param template_searchpath: This list of folders (non relative) |
| defines where jinja will look for your templates. Order matters. |
| Note that jinja/airflow includes the path of your DAG file by |
| default |
| :param template_undefined: Template undefined type. |
| :param user_defined_macros: a dictionary of macros that will be exposed |
| in your jinja templates. For example, passing ``dict(foo='bar')`` |
| to this argument allows you to ``{{ foo }}`` in all jinja |
| templates related to this DAG. Note that you can pass any |
| type of object here. |
| :param user_defined_filters: a dictionary of filters that will be exposed |
| in your jinja templates. For example, passing |
| ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows |
| you to ``{{ 'world' | hello }}`` in all jinja templates related to |
| this DAG. |
| :param default_args: A dictionary of default parameters to be used |
| as constructor keyword parameters when initialising operators. |
| Note that operators have the same hook, and precede those defined |
| here, meaning that if your dict contains `'depends_on_past': True` |
| here and `'depends_on_past': False` in the operator's call |
| `default_args`, the actual value will be `False`. |
| :param params: a dictionary of DAG level parameters that are made |
| accessible in templates, namespaced under `params`. These |
| params can be overridden at the task level. |
| :param max_active_tasks: the number of task instances allowed to run |
| concurrently |
| :param max_active_runs: maximum number of active DAG runs, beyond this |
| number of DAG runs in a running state, the scheduler won't create |
| new active DAG runs |
| :param dagrun_timeout: specify how long a DagRun should be up before |
| timing out / failing, so that new DagRuns can be created. The timeout |
| is only enforced for scheduled DagRuns. |
| :param sla_miss_callback: specify a function to call when reporting SLA |
| timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for |
| more information about the function signature and parameters that are |
| passed to the callback. |
| :param default_view: Specify DAG default view (grid, graph, duration, |
| gantt, landing_times), default grid |
| :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR |
| :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True |
| :param on_failure_callback: A function to be called when a DagRun of this dag fails. |
| A context dictionary is passed as a single parameter to this function. |
| :param on_success_callback: Much like the ``on_failure_callback`` except |
| that it is executed when the dag succeeds. |
| :param access_control: Specify optional DAG-level actions, e.g., |
| "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}" |
| :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time. |
| If the dag exists already, this flag will be ignored. If this optional parameter |
| is not specified, the global config setting will be used. |
| :param jinja_environment_kwargs: additional configuration options to be passed to Jinja |
| ``Environment`` for template rendering |
| |
| **Example**: to avoid Jinja from removing a trailing newline from template strings :: |
| |
| DAG(dag_id='my-dag', |
| jinja_environment_kwargs={ |
| 'keep_trailing_newline': True, |
| # some other jinja2 Environment options here |
| } |
| ) |
| |
| **See**: `Jinja Environment documentation |
| <https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_ |
| |
| :param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment`` |
| to render templates as native Python types. If False, a Jinja |
| ``Environment`` is used to render templates as string values. |
| :param tags: List of tags to help filtering DAGs in the UI. |
| :param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI. |
| Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. |
| e.g: {"dag_owner": "https://airflow.apache.org/"} |
| :param auto_register: Automatically register this DAG when it is used in a ``with`` block |
| |
| .. py:attribute:: fileloc |
| :annotation: :str |
| |
| File path that needs to be imported to load this DAG or subdag. |
| |
| This may not be an actual file on disk in the case when this DAG is loaded |
| from a ZIP file or other DAG distribution format. |
| |
| |
| .. py:attribute:: parent_dag |
| :annotation: :DAG | None |
| |
| |
| |
| .. py:method:: get_doc_md(doc_md) |
| |
| |
| .. py:method:: validate() |
| |
| Validate the DAG has a coherent setup. |
| |
| This is called by the DAG bag before bagging the DAG. |
| |
| |
| .. py:method:: __repr__() |
| |
| Return repr(self). |
| |
| |
| .. py:method:: __eq__(other) |
| |
| Return self==value. |
| |
| |
| .. py:method:: __ne__(other) |
| |
| Return self!=value. |
| |
| |
| .. py:method:: __lt__(other) |
| |
| Return self<value. |
| |
| |
| .. py:method:: __hash__() |
| |
| Return hash(self). |
| |
| |
| .. py:method:: __enter__() |
| |
| |
| .. py:method:: __exit__(_type, _value, _tb) |
| |
| |
| .. py:method:: date_range(start_date, num = None, end_date = None) |
| |
| |
| .. py:method:: is_fixed_time_schedule() |
| |
| |
| .. py:method:: following_schedule(dttm) |
| |
| Calculates the following schedule for this dag in UTC. |
| |
| :param dttm: utc datetime |
| :return: utc datetime |
| |
| |
| .. py:method:: previous_schedule(dttm) |
| |
| |
| .. py:method:: get_next_data_interval(dag_model) |
| |
| Get the data interval of the next scheduled run. |
| |
| For compatibility, this method infers the data interval from the DAG's |
| schedule if the run does not have an explicit one set, which is possible |
| for runs created prior to AIP-39. |
| |
| This function is private to Airflow core and should not be depended as a |
| part of the Python API. |
| |
| :meta private: |
| |
| |
| .. py:method:: get_run_data_interval(run) |
| |
| Get the data interval of this run. |
| |
| For compatibility, this method infers the data interval from the DAG's |
| schedule if the run does not have an explicit one set, which is possible for |
| runs created prior to AIP-39. |
| |
| This function is private to Airflow core and should not be depended as a |
| part of the Python API. |
| |
| :meta private: |
| |
| |
| .. py:method:: infer_automated_data_interval(logical_date) |
| |
| Infer a data interval for a run against this DAG. |
| |
| This method is used to bridge runs created prior to AIP-39 |
| implementation, which do not have an explicit data interval. Therefore, |
| this method only considers ``schedule_interval`` values valid prior to |
| Airflow 2.2. |
| |
| DO NOT use this method is there is a known data interval. |
| |
| |
| .. py:method:: next_dagrun_info(last_automated_dagrun, *, restricted = True) |
| |
| Get information about the next DagRun of this dag after ``date_last_automated_dagrun``. |
| |
| This calculates what time interval the next DagRun should operate on |
| (its execution date) and when it can be scheduled, according to the |
| dag's timetable, start_date, end_date, etc. This doesn't check max |
| active run or any other "max_active_tasks" type limits, but only |
| performs calculations based on the various date and interval fields of |
| this dag and its tasks. |
| |
| :param last_automated_dagrun: The ``max(execution_date)`` of |
| existing "automated" DagRuns for this dag (scheduled or backfill, |
| but not manual). |
| :param restricted: If set to *False* (default is *True*), ignore |
| ``start_date``, ``end_date``, and ``catchup`` specified on the DAG |
| or tasks. |
| :return: DagRunInfo of the next dagrun, or None if a dagrun is not |
| going to be scheduled. |
| |
| |
| .. py:method:: next_dagrun_after_date(date_last_automated_dagrun) |
| |
| |
| .. py:method:: iter_dagrun_infos_between(earliest, latest, *, align = True) |
| |
| Yield DagRunInfo using this DAG's timetable between given interval. |
| |
| DagRunInfo instances yielded if their ``logical_date`` is not earlier |
| than ``earliest``, nor later than ``latest``. The instances are ordered |
| by their ``logical_date`` from earliest to latest. |
| |
| If ``align`` is ``False``, the first run will happen immediately on |
| ``earliest``, even if it does not fall on the logical timetable schedule. |
| The default is ``True``, but subdags will ignore this value and always |
| behave as if this is set to ``False`` for backward compatibility. |
| |
| Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If |
| ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be |
| ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00`` |
| if ``align=True``. |
| |
| |
| .. py:method:: get_run_dates(start_date, end_date=None) |
| |
| Returns a list of dates between the interval received as parameter using this |
| dag's schedule interval. Returned dates can be used for execution dates. |
| |
| :param start_date: The start date of the interval. |
| :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. |
| :return: A list of dates within the interval following the dag's schedule. |
| :rtype: list |
| |
| |
| .. py:method:: normalize_schedule(dttm) |
| |
| |
| .. py:method:: get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False) |
| |
| |
| .. py:method:: has_dag_runs(session=NEW_SESSION, include_externally_triggered=True) |
| |
| |
| .. py:method:: dag_id() |
| :property: |
| |
| |
| .. py:method:: is_subdag() |
| :property: |
| |
| |
| .. py:method:: full_filepath() |
| :property: |
| |
| :meta private: |
| |
| |
| .. py:method:: concurrency() |
| :property: |
| |
| |
| .. py:method:: max_active_tasks() |
| :property: |
| |
| |
| .. py:method:: access_control() |
| :property: |
| |
| |
| .. py:method:: description() |
| :property: |
| |
| |
| .. py:method:: default_view() |
| :property: |
| |
| |
| .. py:method:: pickle_id() |
| :property: |
| |
| |
| .. py:method:: param(name, default = NOTSET) |
| |
| Return a DagParam object for current dag. |
| |
| :param name: dag parameter name. |
| :param default: fallback value for dag parameter. |
| :return: DagParam instance for specified name and current dag. |
| |
| |
| .. py:method:: tasks() |
| :property: |
| |
| |
| .. py:method:: task_ids() |
| :property: |
| |
| |
| .. py:method:: task_group() |
| :property: |
| |
| |
| .. py:method:: filepath() |
| :property: |
| |
| :meta private: |
| |
| |
| .. py:method:: relative_fileloc() |
| :property: |
| |
| File location of the importable dag 'file' relative to the configured DAGs folder. |
| |
| |
| .. py:method:: folder() |
| :property: |
| |
| Folder location of where the DAG object is instantiated. |
| |
| |
| .. py:method:: owner() |
| :property: |
| |
| Return list of all owners found in DAG tasks. |
| |
| :return: Comma separated list of owners in DAG tasks |
| :rtype: str |
| |
| |
| .. py:method:: allow_future_exec_dates() |
| :property: |
| |
| |
| .. py:method:: get_concurrency_reached(session=NEW_SESSION) |
| |
| Returns a boolean indicating whether the max_active_tasks limit for this DAG |
| has been reached |
| |
| |
| .. py:method:: concurrency_reached() |
| :property: |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method. |
| |
| |
| .. py:method:: get_is_active(session=NEW_SESSION) |
| |
| Returns a boolean indicating whether this DAG is active |
| |
| |
| .. py:method:: get_is_paused(session=NEW_SESSION) |
| |
| Returns a boolean indicating whether this DAG is paused |
| |
| |
| .. py:method:: is_paused() |
| :property: |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method. |
| |
| |
| .. py:method:: normalized_schedule_interval() |
| :property: |
| |
| |
| .. py:method:: handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION) |
| |
| Triggers the appropriate callback depending on the value of success, namely the |
| on_failure_callback or on_success_callback. This method gets the context of a |
| single TaskInstance part of this DagRun and passes that to the callable along |
| with a 'reason', primarily to differentiate DagRun failures. |
| |
| .. note: The logs end up in |
| ``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log`` |
| |
| :param dagrun: DagRun object |
| :param success: Flag to specify if failure or success callback should be called |
| :param reason: Completion reason |
| :param session: Database session |
| |
| |
| .. py:method:: get_active_runs() |
| |
| Returns a list of dag run execution dates currently running |
| |
| :return: List of execution dates |
| |
| |
| .. py:method:: get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION) |
| |
| Returns the number of active "running" dag runs |
| |
| :param external_trigger: True for externally triggered active dag runs |
| :param session: |
| :return: number greater than 0 for active dag runs |
| |
| |
| .. py:method:: get_dagrun(execution_date = None, run_id = None, session = NEW_SESSION) |
| |
| Returns the dag run for a given execution date or run_id if it exists, otherwise |
| none. |
| |
| :param execution_date: The execution date of the DagRun to find. |
| :param run_id: The run_id of the DagRun to find. |
| :param session: |
| :return: The DagRun if found, otherwise None. |
| |
| |
| .. py:method:: get_dagruns_between(start_date, end_date, session=NEW_SESSION) |
| |
| Returns the list of dag runs between start_date (inclusive) and end_date (inclusive). |
| |
| :param start_date: The starting execution date of the DagRun to find. |
| :param end_date: The ending execution date of the DagRun to find. |
| :param session: |
| :return: The list of DagRuns found. |
| |
| |
| .. py:method:: get_latest_execution_date(session = NEW_SESSION) |
| |
| Returns the latest date for which at least one dag run exists |
| |
| |
| .. py:method:: latest_execution_date() |
| :property: |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`. |
| |
| |
| .. py:method:: subdags() |
| :property: |
| |
| Returns a list of the subdag objects associated to this DAG |
| |
| |
| .. py:method:: resolve_template_files() |
| |
| |
| .. py:method:: get_template_env(*, force_sandboxed = False) |
| |
| Build a Jinja2 environment. |
| |
| |
| .. py:method:: set_dependency(upstream_task_id, downstream_task_id) |
| |
| Simple utility method to set dependency between two tasks that |
| already have been added to the DAG using add_task() |
| |
| |
| .. py:method:: get_task_instances_before(base_date, num, *, session = NEW_SESSION) |
| |
| Get ``num`` task instances before (including) ``base_date``. |
| |
| The returned list may contain exactly ``num`` task instances. It can |
| have less if there are less than ``num`` scheduled DAG runs before |
| ``base_date``, or more if there are manual task runs between the |
| requested period, which does not count toward ``num``. |
| |
| |
| .. py:method:: get_task_instances(start_date = None, end_date = None, state = None, session = NEW_SESSION) |
| |
| |
| .. py:method:: set_task_instance_state(*, task_id, map_indexes = None, execution_date = None, run_id = None, state, upstream = False, downstream = False, future = False, past = False, commit = True, session=NEW_SESSION) |
| |
| Set the state of a TaskInstance to the given state, and clear its downstream tasks that are |
| in failed or upstream_failed state. |
| |
| :param task_id: Task ID of the TaskInstance |
| :param map_indexes: Only set TaskInstance if its map_index matches. |
| If None (default), all mapped TaskInstances of the task are set. |
| :param execution_date: Execution date of the TaskInstance |
| :param run_id: The run_id of the TaskInstance |
| :param state: State to set the TaskInstance to |
| :param upstream: Include all upstream tasks of the given task_id |
| :param downstream: Include all downstream tasks of the given task_id |
| :param future: Include all future TaskInstances of the given task_id |
| :param commit: Commit changes |
| :param past: Include all past TaskInstances of the given task_id |
| |
| |
| .. py:method:: roots() |
| :property: |
| |
| Return nodes with no parents. These are first to execute and are called roots or root nodes. |
| |
| |
| .. py:method:: leaves() |
| :property: |
| |
| Return nodes with no children. These are last to execute and are called leaves or leaf nodes. |
| |
| |
| .. py:method:: topological_sort(include_subdag_tasks = False) |
| |
| Sorts tasks in topographical order, such that a task comes after any of its |
| upstream dependencies. |
| |
| Deprecated in place of ``task_group.topological_sort`` |
| |
| |
| .. py:method:: set_dag_runs_state(state = State.RUNNING, session = NEW_SESSION, start_date = None, end_date = None, dag_ids = []) |
| |
| |
| .. py:method:: clear(task_ids = None, start_date = None, end_date = None, only_failed = False, only_running = False, confirm_prompt = False, include_subdags = True, include_parentdag = True, dag_run_state = DagRunState.QUEUED, dry_run = False, session = NEW_SESSION, get_tis = False, recursion_depth = 0, max_recursion_depth = None, dag_bag = None, exclude_task_ids = frozenset()) |
| |
| Clears a set of task instances associated with the current dag for |
| a specified date range. |
| |
| :param task_ids: List of task ids or (``task_id``, ``map_index``) tuples to clear |
| :param start_date: The minimum execution_date to clear |
| :param end_date: The maximum execution_date to clear |
| :param only_failed: Only clear failed tasks |
| :param only_running: Only clear running tasks. |
| :param confirm_prompt: Ask for confirmation |
| :param include_subdags: Clear tasks in subdags and clear external tasks |
| indicated by ExternalTaskMarker |
| :param include_parentdag: Clear tasks in the parent dag of the subdag. |
| :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not |
| be changed. |
| :param dry_run: Find the tasks to clear but don't clear them. |
| :param session: The sqlalchemy session to use |
| :param dag_bag: The DagBag used to find the dags subdags (Optional) |
| :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``) |
| tuples that should not be cleared |
| |
| |
| .. py:method:: clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False) |
| :classmethod: |
| |
| |
| .. py:method:: __deepcopy__(memo) |
| |
| |
| .. py:method:: sub_dag(*args, **kwargs) |
| |
| This method is deprecated in favor of partial_subset |
| |
| |
| .. py:method:: partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False) |
| |
| Returns a subset of the current dag as a deep copy of the current dag |
| based on a regex that should match one or many tasks, and includes |
| upstream and downstream neighbours based on the flag passed. |
| |
| :param task_ids_or_regex: Either a list of task_ids, or a regex to |
| match against task ids (as a string, or compiled regex pattern). |
| :param include_downstream: Include all downstream tasks of matched |
| tasks, in addition to matched tasks. |
| :param include_upstream: Include all upstream tasks of matched tasks, |
| in addition to matched tasks. |
| :param include_direct_upstream: Include all tasks directly upstream of matched |
| and downstream (if include_downstream = True) tasks |
| |
| |
| .. py:method:: has_task(task_id) |
| |
| |
| .. py:method:: has_task_group(task_group_id) |
| |
| |
| .. py:method:: task_group_dict() |
| |
| |
| .. py:method:: get_task(task_id, include_subdags = False) |
| |
| |
| .. py:method:: pickle_info() |
| |
| |
| .. py:method:: pickle(session=NEW_SESSION) |
| |
| |
| .. py:method:: tree_view() |
| |
| Print an ASCII tree representation of the DAG. |
| |
| |
| .. py:method:: task() |
| :property: |
| |
| |
| .. py:method:: add_task(task) |
| |
| Add a task to the DAG |
| |
| :param task: the task you want to add |
| |
| |
| .. py:method:: add_tasks(tasks) |
| |
| Add a list of tasks to the DAG |
| |
| :param tasks: a lit of tasks you want to add |
| |
| |
| .. py:method:: run(start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False) |
| |
| Runs the DAG. |
| |
| :param start_date: the start date of the range to run |
| :param end_date: the end date of the range to run |
| :param mark_success: True to mark jobs as succeeded without running them |
| :param local: True to run the tasks using the LocalExecutor |
| :param executor: The executor instance to run the tasks |
| :param donot_pickle: True to avoid pickling DAG object and send to workers |
| :param ignore_task_deps: True to skip upstream tasks |
| :param ignore_first_depends_on_past: True to ignore depends_on_past |
| dependencies for the first set of tasks only |
| :param pool: Resource pool to use |
| :param delay_on_limit_secs: Time in seconds to wait before next attempt to run |
| dag run when max_active_runs limit has been reached |
| :param verbose: Make logging output more verbose |
| :param conf: user defined dictionary passed from CLI |
| :param rerun_failed_tasks: |
| :param run_backwards: |
| :param run_at_least_once: If true, always run the DAG at least once even |
| if no logical run exists within the time range. |
| |
| |
| .. py:method:: cli() |
| |
| Exposes a CLI specific to this DAG |
| |
| |
| .. py:method:: create_dagrun(state, execution_date = None, run_id = None, start_date = None, external_trigger = False, conf = None, run_type = None, session=NEW_SESSION, dag_hash = None, creating_job_id = None, data_interval = None) |
| |
| Creates a dag run from this dag including the tasks associated with this dag. |
| Returns the dag run. |
| |
| :param run_id: defines the run id for this dag run |
| :param run_type: type of DagRun |
| :param execution_date: the execution date of this dag run |
| :param state: the state of the dag run |
| :param start_date: the date this dag run should be evaluated |
| :param external_trigger: whether this dag run is externally triggered |
| :param conf: Dict containing configuration/parameters to pass to the DAG |
| :param creating_job_id: id of the job creating this DagRun |
| :param session: database session |
| :param dag_hash: Hash of Serialized DAG |
| :param data_interval: Data interval of the DagRun |
| |
| |
| .. py:method:: bulk_sync_to_db(dags, session=NEW_SESSION) |
| :classmethod: |
| |
| This method is deprecated in favor of bulk_write_to_db |
| |
| |
| .. py:method:: bulk_write_to_db(dags, processor_subdir = None, session=NEW_SESSION) |
| :classmethod: |
| |
| Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including |
| calculated fields. |
| |
| Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. |
| |
| :param dags: the DAG objects to save to the DB |
| :return: None |
| |
| |
| .. py:method:: sync_to_db(processor_subdir = None, session=NEW_SESSION) |
| |
| Save attributes about this DAG to the DB. Note that this method |
| can be called for both DAGs and SubDAGs. A SubDag is actually a |
| SubDagOperator. |
| |
| :return: None |
| |
| |
| .. py:method:: get_default_view() |
| |
| This is only there for backward compatible jinja2 templates |
| |
| |
| .. py:method:: deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION) |
| :staticmethod: |
| |
| Given a list of known DAGs, deactivate any other DAGs that are |
| marked as active in the ORM |
| |
| :param active_dag_ids: list of DAG IDs that are active |
| :return: None |
| |
| |
| .. py:method:: deactivate_stale_dags(expiration_date, session=NEW_SESSION) |
| :staticmethod: |
| |
| Deactivate any DAGs that were last touched by the scheduler before |
| the expiration date. These DAGs were likely deleted. |
| |
| :param expiration_date: set inactive DAGs that were touched before this |
| time |
| :return: None |
| |
| |
| .. py:method:: get_num_task_instances(dag_id, task_ids=None, states=None, session=NEW_SESSION) |
| :staticmethod: |
| |
| Returns the number of task instances in the given DAG. |
| |
| :param session: ORM session |
| :param dag_id: ID of the DAG to get the task concurrency of |
| :param task_ids: A list of valid task IDs for the given DAG |
| :param states: A list of states to filter by if supplied |
| :return: The number of running tasks |
| :rtype: int |
| |
| |
| .. py:method:: get_serialized_fields() |
| :classmethod: |
| |
| Stringified DAGs and operators contain exactly these fields. |
| |
| |
| .. py:method:: get_edge_info(upstream_task_id, downstream_task_id) |
| |
| Returns edge information for the given pair of tasks if present, and |
| an empty edge if there is no information. |
| |
| |
| .. py:method:: set_edge_info(upstream_task_id, downstream_task_id, info) |
| |
| Sets the given edge information on the DAG. Note that this will overwrite, |
| rather than merge with, existing info. |
| |
| |
| .. py:method:: validate_schedule_and_params() |
| |
| Validates & raise exception if there are any Params in the DAG which neither have a default value nor |
| have the null in schema['type'] list, but the DAG have a schedule_interval which is not None. |
| |
| |
| .. py:method:: iter_invalid_owner_links() |
| |
| Parses a given link, and verifies if it's a valid URL, or a 'mailto' link. |
| Returns an iterator of invalid (owner, link) pairs. |
| |
| |
| |
| .. py:class:: DagTag |
| |
| Bases: :py:obj:`airflow.models.base.Base` |
| |
| A tag name per dag, to allow quick filtering in the DAG view. |
| |
| .. py:attribute:: __tablename__ |
| :annotation: = dag_tag |
| |
| |
| |
| .. py:attribute:: name |
| |
| |
| |
| |
| .. py:attribute:: dag_id |
| |
| |
| |
| |
| .. py:method:: __repr__() |
| |
| |
| |
| .. py:class:: DagOwnerAttributes |
| |
| Bases: :py:obj:`airflow.models.base.Base` |
| |
| Table defining different owner attributes. For example, a link for an owner that will be passed as |
| a hyperlink to the DAGs view |
| |
| .. py:attribute:: __tablename__ |
| :annotation: = dag_owner_attributes |
| |
| |
| |
| .. py:attribute:: dag_id |
| |
| |
| |
| |
| .. py:attribute:: owner |
| |
| |
| |
| |
| .. py:attribute:: link |
| |
| |
| |
| |
| .. py:method:: __repr__() |
| |
| |
| .. py:method:: get_all(session) |
| :classmethod: |
| |
| |
| |
| .. py:class:: DagModel(concurrency=None, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.base.Base` |
| |
| Table containing DAG properties |
| |
| .. py:attribute:: __tablename__ |
| :annotation: = dag |
| |
| These items are stored in the database for state related information |
| |
| |
| .. py:attribute:: dag_id |
| |
| |
| |
| |
| .. py:attribute:: root_dag_id |
| |
| |
| |
| |
| .. py:attribute:: is_paused_at_creation |
| |
| |
| |
| |
| .. py:attribute:: is_paused |
| |
| |
| |
| |
| .. py:attribute:: is_subdag |
| |
| |
| |
| |
| .. py:attribute:: is_active |
| |
| |
| |
| |
| .. py:attribute:: last_parsed_time |
| |
| |
| |
| |
| .. py:attribute:: last_pickled |
| |
| |
| |
| |
| .. py:attribute:: last_expired |
| |
| |
| |
| |
| .. py:attribute:: scheduler_lock |
| |
| |
| |
| |
| .. py:attribute:: pickle_id |
| |
| |
| |
| |
| .. py:attribute:: fileloc |
| |
| |
| |
| |
| .. py:attribute:: processor_subdir |
| |
| |
| |
| |
| .. py:attribute:: owners |
| |
| |
| |
| |
| .. py:attribute:: description |
| |
| |
| |
| |
| .. py:attribute:: default_view |
| |
| |
| |
| |
| .. py:attribute:: schedule_interval |
| |
| |
| |
| |
| .. py:attribute:: timetable_description |
| |
| |
| |
| |
| .. py:attribute:: tags |
| |
| |
| |
| |
| .. py:attribute:: dag_owner_links |
| |
| |
| |
| |
| .. py:attribute:: max_active_tasks |
| |
| |
| |
| |
| .. py:attribute:: max_active_runs |
| |
| |
| |
| |
| .. py:attribute:: has_task_concurrency_limits |
| |
| |
| |
| |
| .. py:attribute:: has_import_errors |
| |
| |
| |
| |
| .. py:attribute:: next_dagrun |
| |
| |
| |
| |
| .. py:attribute:: next_dagrun_data_interval_start |
| |
| |
| |
| |
| .. py:attribute:: next_dagrun_data_interval_end |
| |
| |
| |
| |
| .. py:attribute:: next_dagrun_create_after |
| |
| |
| |
| |
| .. py:attribute:: __table_args__ |
| |
| |
| |
| |
| .. py:attribute:: parent_dag |
| |
| |
| |
| |
| .. py:attribute:: schedule_dataset_references |
| |
| |
| |
| |
| .. py:attribute:: schedule_datasets |
| |
| |
| |
| |
| .. py:attribute:: task_outlet_dataset_references |
| |
| |
| |
| |
| .. py:attribute:: NUM_DAGS_PER_DAGRUN_QUERY |
| |
| |
| |
| |
| .. py:method:: __repr__() |
| |
| |
| .. py:method:: next_dagrun_data_interval() |
| :property: |
| |
| |
| .. py:method:: timezone() |
| :property: |
| |
| |
| .. py:method:: get_dagmodel(dag_id, session=NEW_SESSION) |
| :staticmethod: |
| |
| |
| .. py:method:: get_current(dag_id, session=NEW_SESSION) |
| :classmethod: |
| |
| |
| .. py:method:: get_all_paused_dag_ids(session = NEW_SESSION) |
| :staticmethod: |
| |
| Get a set of paused DAG ids |
| |
| |
| .. py:method:: get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False) |
| |
| |
| .. py:method:: get_is_paused(*, session = None) |
| |
| Provide interface compatibility to 'DAG'. |
| |
| |
| .. py:method:: get_paused_dag_ids(dag_ids, session = NEW_SESSION) |
| :staticmethod: |
| |
| Given a list of dag_ids, get a set of Paused Dag Ids |
| |
| :param dag_ids: List of Dag ids |
| :param session: ORM Session |
| :return: Paused Dag_ids |
| |
| |
| .. py:method:: get_default_view() |
| |
| Get the Default DAG View, returns the default config value if DagModel does not |
| have a value |
| |
| |
| .. py:method:: safe_dag_id() |
| :property: |
| |
| |
| .. py:method:: relative_fileloc() |
| :property: |
| |
| File location of the importable dag 'file' relative to the configured DAGs folder. |
| |
| |
| .. py:method:: set_is_paused(is_paused, including_subdags = True, session=NEW_SESSION) |
| |
| Pause/Un-pause a DAG. |
| |
| :param is_paused: Is the DAG paused |
| :param including_subdags: whether to include the DAG's subdags |
| :param session: session |
| |
| |
| .. py:method:: deactivate_deleted_dags(alive_dag_filelocs, session=NEW_SESSION) |
| :classmethod: |
| |
| Set ``is_active=False`` on the DAGs for which the DAG files have been removed. |
| |
| :param alive_dag_filelocs: file paths of alive DAGs |
| :param session: ORM Session |
| |
| |
| .. py:method:: dags_needing_dagruns(session) |
| :classmethod: |
| |
| Return (and lock) a list of Dag objects that are due to create a new DagRun. |
| |
| This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, |
| you should ensure that any scheduling decisions are made in a single transaction -- as soon as the |
| transaction is committed it will be unlocked. |
| |
| |
| .. py:method:: calculate_dagrun_date_fields(dag, most_recent_dag_run) |
| |
| Calculate ``next_dagrun`` and `next_dagrun_create_after`` |
| |
| :param dag: The DAG object |
| :param most_recent_dag_run: DataInterval (or datetime) of most recent run of this dag, or none |
| if not yet scheduled. |
| |
| |
| .. py:method:: get_dataset_triggered_next_run_info(*, session=NEW_SESSION) |
| |
| |
| |
| .. py:function:: dag(dag_id = '', description = None, schedule = NOTSET, schedule_interval = NOTSET, timetable = None, start_date = None, end_date = None, full_filepath = None, template_searchpath = None, template_undefined = jinja2.StrictUndefined, user_defined_macros = None, user_defined_filters = None, default_args = None, concurrency = None, max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout = None, sla_miss_callback = None, default_view = conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation = conf.get_mandatory_value('webserver', 'dag_orientation'), catchup = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback = None, on_failure_callback = None, doc_md = None, params = None, access_control = None, is_paused_upon_creation = None, jinja_environment_kwargs = None, render_template_as_native_obj = False, tags = None, owner_links = None, auto_register = True) |
| |
| Python dag decorator. Wraps a function into an Airflow DAG. |
| Accepts kwargs for operator kwarg. Can be used to parameterize DAGs. |
| |
| :param dag_args: Arguments for DAG object |
| :param dag_kwargs: Kwargs for DAG object. |
| |
| |
| .. py:class:: DagContext |
| |
| DAG context is used to keep the current DAG when DAG is used as ContextManager. |
| |
| You can use DAG as context: |
| |
| .. code-block:: python |
| |
| with DAG( |
| dag_id="example_dag", |
| default_args=default_args, |
| schedule="0 0 * * *", |
| dagrun_timeout=timedelta(minutes=60), |
| ) as dag: |
| ... |
| |
| If you do this the context stores the DAG and whenever new task is created, it will use |
| such stored DAG as the parent DAG. |
| |
| |
| .. py:attribute:: autoregistered_dags |
| :annotation: :set[tuple[DAG, types.ModuleType]] |
| |
| |
| |
| .. py:attribute:: current_autoregister_module_name |
| :annotation: :str | None |
| |
| |
| |
| .. py:method:: push_context_managed_dag(dag) |
| :classmethod: |
| |
| |
| .. py:method:: pop_context_managed_dag() |
| :classmethod: |
| |
| |
| .. py:method:: get_current_dag() |
| :classmethod: |
| |
| |
| |