| :mod:`airflow.models` |
| ===================== |
| |
| .. py:module:: airflow.models |
| |
| .. autoapi-nested-parse:: |
| |
| Airflow models |
| |
| |
| |
| Submodules |
| ---------- |
| .. toctree:: |
| :titlesonly: |
| :maxdepth: 1 |
| |
| base/index.rst |
| baseoperator/index.rst |
| connection/index.rst |
| crypto/index.rst |
| dag/index.rst |
| dagbag/index.rst |
| dagcode/index.rst |
| dagparam/index.rst |
| dagpickle/index.rst |
| dagrun/index.rst |
| errors/index.rst |
| log/index.rst |
| pool/index.rst |
| renderedtifields/index.rst |
| sensorinstance/index.rst |
| serialized_dag/index.rst |
| skipmixin/index.rst |
| slamiss/index.rst |
| taskfail/index.rst |
| taskinstance/index.rst |
| taskmixin/index.rst |
| taskreschedule/index.rst |
| variable/index.rst |
| xcom/index.rst |
| xcom_arg/index.rst |
| |
| |
| Package Contents |
| ---------------- |
| |
| .. data:: ID_LEN |
| :annotation: = 250 |
| |
| |
| |
| .. data:: Base |
| :annotation: :Any |
| |
| |
| |
| .. py:class:: BaseOperator(task_id: str, owner: str = conf.get('operators', 'DEFAULT_OWNER'), email: Optional[Union[str, Iterable[str]]] = None, email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True), retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0), retry_delay: timedelta = timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: Optional[timedelta] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, depends_on_past: bool = False, wait_for_downstream: bool = False, dag=None, params: Optional[Dict] = None, default_args: Optional[Dict] = None, priority_weight: int = 1, weight_rule: str = WeightRule.DOWNSTREAM, queue: str = conf.get('operators', 'default_queue'), pool: Optional[str] = None, pool_slots: int = 1, sla: Optional[timedelta] = None, execution_timeout: Optional[timedelta] = None, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, on_retry_callback: Optional[TaskStateChangeCallback] = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: Optional[Dict] = None, run_as_user: Optional[str] = None, task_concurrency: Optional[int] = None, executor_config: Optional[Dict] = None, do_xcom_push: bool = True, inlets: Optional[Any] = None, outlets: Optional[Any] = None, task_group: Optional['TaskGroup'] = None, doc: Optional[str] = None, doc_md: Optional[str] = None, doc_json: Optional[str] = None, doc_yaml: Optional[str] = None, doc_rst: Optional[str] = None, **kwargs) |
| |
| Bases: :class:`airflow.models.base.Operator`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`, :class:`airflow.models.taskmixin.TaskMixin` |
| |
| Abstract base class for all operators. Since operators create objects that |
| become nodes in the dag, BaseOperator contains many recursive methods for |
| dag crawling behavior. To derive this class, you are expected to override |
| the constructor as well as the 'execute' method. |
| |
| Operators derived from this class should perform or trigger certain tasks |
| synchronously (wait for completion). Example of operators could be an |
| operator that runs a Pig job (PigOperator), a sensor operator that |
| waits for a partition to land in Hive (HiveSensorOperator), or one that |
| moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these |
| operators (tasks) target specific operations, running specific scripts, |
| functions or data transfers. |
| |
| This class is abstract and shouldn't be instantiated. Instantiating a |
| class derived from this one results in the creation of a task object, |
| which ultimately becomes a node in DAG objects. Task dependencies should |
| be set by using the set_upstream and/or set_downstream methods. |
| |
| :param task_id: a unique, meaningful id for the task |
| :type task_id: str |
| :param owner: the owner of the task, using the unix username is recommended |
| :type owner: str |
| :param email: the 'to' email address(es) used in email alerts. This can be a |
| single email or multiple ones. Multiple addresses can be specified as a |
| comma or semi-colon separated string or by passing a list of strings. |
| :type email: str or list[str] |
| :param email_on_retry: Indicates whether email alerts should be sent when a |
| task is retried |
| :type email_on_retry: bool |
| :param email_on_failure: Indicates whether email alerts should be sent when |
| a task failed |
| :type email_on_failure: bool |
| :param retries: the number of retries that should be performed before |
| failing the task |
| :type retries: int |
| :param retry_delay: delay between retries |
| :type retry_delay: datetime.timedelta |
| :param retry_exponential_backoff: allow progressive longer waits between |
| retries by using exponential backoff algorithm on retry delay (delay |
| will be converted into seconds) |
| :type retry_exponential_backoff: bool |
| :param max_retry_delay: maximum delay interval between retries |
| :type max_retry_delay: datetime.timedelta |
| :param start_date: The ``start_date`` for the task, determines |
| the ``execution_date`` for the first task instance. The best practice |
| is to have the start_date rounded |
| to your DAG's ``schedule_interval``. Daily jobs have their start_date |
| some day at 00:00:00, hourly jobs have their start_date at 00:00 |
| of a specific hour. Note that Airflow simply looks at the latest |
| ``execution_date`` and adds the ``schedule_interval`` to determine |
| the next ``execution_date``. It is also very important |
| to note that different tasks' dependencies |
| need to line up in time. If task A depends on task B and their |
| start_date are offset in a way that their execution_date don't line |
| up, A's dependencies will never be met. If you are looking to delay |
| a task, for example running a daily task at 2AM, look into the |
| ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using |
| dynamic ``start_date`` and recommend using fixed ones. Read the |
| FAQ entry about start_date for more information. |
| :type start_date: datetime.datetime |
| :param end_date: if specified, the scheduler won't go beyond this date |
| :type end_date: datetime.datetime |
| :param depends_on_past: when set to true, task instances will run |
| sequentially and only if the previous instance has succeeded or has been skipped. |
| The task instance for the start_date is allowed to run. |
| :type depends_on_past: bool |
| :param wait_for_downstream: when set to true, an instance of task |
| X will wait for tasks immediately downstream of the previous instance |
| of task X to finish successfully or be skipped before it runs. This is useful if the |
| different instances of a task X alter the same asset, and this asset |
| is used by tasks downstream of task X. Note that depends_on_past |
| is forced to True wherever wait_for_downstream is used. Also note that |
| only tasks *immediately* downstream of the previous task instance are waited |
| for; the statuses of any tasks further downstream are ignored. |
| :type wait_for_downstream: bool |
| :param dag: a reference to the dag the task is attached to (if any) |
| :type dag: airflow.models.DAG |
| :param priority_weight: priority weight of this task against other task. |
| This allows the executor to trigger higher priority tasks before |
| others when things get backed up. Set priority_weight as a higher |
| number for more important tasks. |
| :type priority_weight: int |
| :param weight_rule: weighting method used for the effective total |
| priority weight of the task. Options are: |
| ``{ downstream | upstream | absolute }`` default is ``downstream`` |
| When set to ``downstream`` the effective weight of the task is the |
| aggregate sum of all downstream descendants. As a result, upstream |
| tasks will have higher weight and will be scheduled more aggressively |
| when using positive weight values. This is useful when you have |
| multiple dag run instances and desire to have all upstream tasks to |
| complete for all runs before each dag can continue processing |
| downstream tasks. When set to ``upstream`` the effective weight is the |
| aggregate sum of all upstream ancestors. This is the opposite where |
| downstream tasks have higher weight and will be scheduled more |
| aggressively when using positive weight values. This is useful when you |
| have multiple dag run instances and prefer to have each dag complete |
| before starting upstream tasks of other dags. When set to |
| ``absolute``, the effective weight is the exact ``priority_weight`` |
| specified without additional weighting. You may want to do this when |
| you know exactly what priority weight each task should have. |
| Additionally, when set to ``absolute``, there is bonus effect of |
| significantly speeding up the task creation process as for very large |
| DAGS. Options can be set as string or using the constants defined in |
| the static class ``airflow.utils.WeightRule`` |
| :type weight_rule: str |
| :param queue: which queue to target when running this job. Not |
| all executors implement queue management, the CeleryExecutor |
| does support targeting specific queues. |
| :type queue: str |
| :param pool: the slot pool this task should run in, slot pools are a |
| way to limit concurrency for certain tasks |
| :type pool: str |
| :param pool_slots: the number of pool slots this task should use (>= 1) |
| Values less than 1 are not allowed. |
| :type pool_slots: int |
| :param sla: time by which the job is expected to succeed. Note that |
| this represents the ``timedelta`` after the period is closed. For |
| example if you set an SLA of 1 hour, the scheduler would send an email |
| soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance |
| has not succeeded yet. |
| The scheduler pays special attention for jobs with an SLA and |
| sends alert |
| emails for sla misses. SLA misses are also recorded in the database |
| for future reference. All tasks that share the same SLA time |
| get bundled in a single email, sent soon after that time. SLA |
| notification are sent once and only once for each task instance. |
| :type sla: datetime.timedelta |
| :param execution_timeout: max time allowed for the execution of |
| this task instance, if it goes beyond it will raise and fail. |
| :type execution_timeout: datetime.timedelta |
| :param on_failure_callback: a function to be called when a task instance |
| of this task fails. a context dictionary is passed as a single |
| parameter to this function. Context contains references to related |
| objects to the task instance and is documented under the macros |
| section of the API. |
| :type on_failure_callback: TaskStateChangeCallback |
| :param on_execute_callback: much like the ``on_failure_callback`` except |
| that it is executed right before the task is executed. |
| :type on_execute_callback: TaskStateChangeCallback |
| :param on_retry_callback: much like the ``on_failure_callback`` except |
| that it is executed when retries occur. |
| :type on_retry_callback: TaskStateChangeCallback |
| :param on_success_callback: much like the ``on_failure_callback`` except |
| that it is executed when the task succeeds. |
| :type on_success_callback: TaskStateChangeCallback |
| :param trigger_rule: defines the rule by which dependencies are applied |
| for the task to get triggered. Options are: |
| ``{ all_success | all_failed | all_done | one_success | |
| one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy}`` |
| default is ``all_success``. Options can be set as string or |
| using the constants defined in the static class |
| ``airflow.utils.TriggerRule`` |
| :type trigger_rule: str |
| :param resources: A map of resource parameter names (the argument names of the |
| Resources constructor) to their values. |
| :type resources: dict |
| :param run_as_user: unix username to impersonate while running the task |
| :type run_as_user: str |
| :param task_concurrency: When set, a task will be able to limit the concurrent |
| runs across execution_dates |
| :type task_concurrency: int |
| :param executor_config: Additional task-level configuration parameters that are |
| interpreted by a specific executor. Parameters are namespaced by the name of |
| executor. |
| |
| **Example**: to run this task in a specific docker container through |
| the KubernetesExecutor :: |
| |
| MyOperator(..., |
| executor_config={ |
| "KubernetesExecutor": |
| {"image": "myCustomDockerImage"} |
| } |
| ) |
| |
| :type executor_config: dict |
| :param do_xcom_push: if True, an XCom is pushed containing the Operator's |
| result |
| :type do_xcom_push: bool |
| :param doc: Add documentation or notes to your Task objects that is visible in |
| Task Instance details View in the Webserver |
| :type doc: str |
| :param doc_md: Add documentation (in Markdown format) or notes to your Task objects |
| that is visible in Task Instance details View in the Webserver |
| :type doc_md: str |
| :param doc_rst: Add documentation (in RST format) or notes to your Task objects |
| that is visible in Task Instance details View in the Webserver |
| :type doc_rst: str |
| :param doc_json: Add documentation (in JSON format) or notes to your Task objects |
| that is visible in Task Instance details View in the Webserver |
| :type doc_json: str |
| :param doc_yaml: Add documentation (in YAML format) or notes to your Task objects |
| that is visible in Task Instance details View in the Webserver |
| :type doc_yaml: str |
| |
| .. attribute:: template_fields |
| :annotation: :Iterable[str] = [] |
| |
| |
| |
| .. attribute:: template_ext |
| :annotation: :Iterable[str] = [] |
| |
| |
| |
| .. attribute:: template_fields_renderers |
| :annotation: :Dict[str, str] |
| |
| |
| |
| .. attribute:: ui_color |
| :annotation: :str = #fff |
| |
| |
| |
| .. attribute:: ui_fgcolor |
| :annotation: :str = #000 |
| |
| |
| |
| .. attribute:: pool |
| :annotation: :str = |
| |
| |
| |
| .. attribute:: _base_operator_shallow_copy_attrs |
| :annotation: :Tuple[str, ...] = ['user_defined_macros', 'user_defined_filters', 'params', '_log'] |
| |
| |
| |
| .. attribute:: shallow_copy_attrs |
| :annotation: :Tuple[str, ...] = [] |
| |
| |
| |
| .. attribute:: operator_extra_links |
| :annotation: :Iterable['BaseOperatorLink'] = [] |
| |
| |
| |
| .. attribute:: __serialized_fields |
| :annotation: :Optional[FrozenSet[str]] |
| |
| |
| |
| .. attribute:: _comps |
| |
| |
| |
| |
| .. attribute:: supports_lineage |
| :annotation: = False |
| |
| |
| |
| .. attribute:: __instantiated |
| :annotation: = False |
| |
| |
| |
| .. attribute:: _lock_for_execution |
| :annotation: = False |
| |
| |
| |
| .. attribute:: dag |
| |
| |
| Returns the Operator's DAG if set, otherwise raises an error |
| |
| |
| .. attribute:: dag_id |
| |
| |
| Returns dag id if it has one or an adhoc + owner |
| |
| |
| .. attribute:: deps |
| :annotation: :Iterable[BaseTIDep] |
| |
| Returns the set of dependencies for the operator. These differ from execution |
| context dependencies in that they are specific to tasks and can be |
| extended/overridden by subclasses. |
| |
| |
| .. attribute:: priority_weight_total |
| |
| |
| Total priority weight for the task. It might include all upstream or downstream tasks. |
| depending on the weight rule. |
| |
| - WeightRule.ABSOLUTE - only own weight |
| - WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks |
| - WeightRule.UPSTREAM - adds priority weight of all upstream tasks |
| |
| |
| .. attribute:: upstream_list |
| |
| |
| @property: list of tasks directly upstream |
| |
| |
| .. attribute:: upstream_task_ids |
| |
| |
| @property: set of ids of tasks directly upstream |
| |
| |
| .. attribute:: downstream_list |
| |
| |
| @property: list of tasks directly downstream |
| |
| |
| .. attribute:: downstream_task_ids |
| |
| |
| @property: set of ids of tasks directly downstream |
| |
| |
| .. attribute:: task_type |
| |
| |
| @property: type of the task |
| |
| |
| .. attribute:: roots |
| |
| |
| Required by TaskMixin |
| |
| |
| .. attribute:: leaves |
| |
| |
| Required by TaskMixin |
| |
| |
| .. attribute:: output |
| |
| |
| Returns reference to XCom pushed by current operator |
| |
| |
| .. attribute:: inherits_from_dummy_operator |
| |
| |
| Used to determine if an Operator is inherited from DummyOperator |
| |
| |
| |
| .. method:: __eq__(self, other) |
| |
| |
| |
| |
| .. method:: __ne__(self, other) |
| |
| |
| |
| |
| .. method:: __hash__(self) |
| |
| |
| |
| |
| .. method:: __or__(self, other) |
| |
| Called for [This Operator] | [Operator], The inlets of other |
| will be set to pickup the outlets from this operator. Other will |
| be set as a downstream task of this operator. |
| |
| |
| |
| |
| .. method:: __gt__(self, other) |
| |
| Called for [Operator] > [Outlet], so that if other is an attr annotated object |
| it is set as an outlet of this Operator. |
| |
| |
| |
| |
| .. method:: __lt__(self, other) |
| |
| Called for [Inlet] > [Operator] or [Operator] < [Inlet], so that if other is |
| an attr annotated object it is set as an inlet to this operator |
| |
| |
| |
| |
| .. method:: __setattr__(self, key, value) |
| |
| |
| |
| |
| .. method:: add_inlets(self, inlets: Iterable[Any]) |
| |
| Sets inlets to this operator |
| |
| |
| |
| |
| .. method:: add_outlets(self, outlets: Iterable[Any]) |
| |
| Defines the outlets of this operator |
| |
| |
| |
| |
| .. method:: get_inlet_defs(self) |
| |
| :return: list of inlets defined for this operator |
| |
| |
| |
| |
| .. method:: get_outlet_defs(self) |
| |
| :return: list of outlets defined for this operator |
| |
| |
| |
| |
| .. method:: has_dag(self) |
| |
| Returns True if the Operator has been assigned to a DAG. |
| |
| |
| |
| |
| .. method:: prepare_for_execution(self) |
| |
| Lock task for execution to disable custom action in __setattr__ and |
| returns a copy of the task |
| |
| |
| |
| |
| .. method:: set_xcomargs_dependencies(self) |
| |
| Resolves upstream dependencies of a task. In this way passing an ``XComArg`` |
| as value for a template field will result in creating upstream relation between |
| two tasks. |
| |
| **Example**: :: |
| |
| with DAG(...): |
| generate_content = GenerateContentOperator(task_id="generate_content") |
| send_email = EmailOperator(..., html_content=generate_content.output) |
| |
| # This is equivalent to |
| with DAG(...): |
| generate_content = GenerateContentOperator(task_id="generate_content") |
| send_email = EmailOperator( |
| ..., html_content="{{ task_instance.xcom_pull('generate_content') }}" |
| ) |
| generate_content >> send_email |
| |
| |
| |
| |
| .. method:: operator_extra_link_dict(self) |
| |
| Returns dictionary of all extra links for the operator |
| |
| |
| |
| |
| .. method:: global_operator_extra_link_dict(self) |
| |
| Returns dictionary of all global extra links |
| |
| |
| |
| |
| .. method:: pre_execute(self, context: Any) |
| |
| This hook is triggered right before self.execute() is called. |
| |
| |
| |
| |
| .. method:: execute(self, context: Any) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| |
| |
| .. method:: post_execute(self, context: Any, result: Any = None) |
| |
| This hook is triggered right after self.execute() is called. |
| It is passed the execution context and any results returned by the |
| operator. |
| |
| |
| |
| |
| .. method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |
| |
| .. method:: __deepcopy__(self, memo) |
| |
| Hack sorting double chained task lists by task_id to avoid hitting |
| max_depth on deepcopy operations. |
| |
| |
| |
| |
| .. method:: __getstate__(self) |
| |
| |
| |
| |
| .. method:: __setstate__(self, state) |
| |
| |
| |
| |
| .. method:: render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Environment] = None) |
| |
| Template all attributes listed in template_fields. Note this operation is irreversible. |
| |
| :param context: Dict with values to apply on content |
| :type context: dict |
| :param jinja_env: Jinja environment |
| :type jinja_env: jinja2.Environment |
| |
| |
| |
| |
| .. method:: _do_render_template_fields(self, parent: Any, template_fields: Iterable[str], context: Dict, jinja_env: jinja2.Environment, seen_oids: Set) |
| |
| |
| |
| |
| .. method:: render_template(self, content: Any, context: Dict, jinja_env: Optional[jinja2.Environment] = None, seen_oids: Optional[Set] = None) |
| |
| Render a templated string. The content can be a collection holding multiple templated strings and will |
| be templated recursively. |
| |
| :param content: Content to template. Only strings can be templated (may be inside collection). |
| :type content: Any |
| :param context: Dict with values to apply on templated content |
| :type context: dict |
| :param jinja_env: Jinja environment. Can be provided to avoid re-creating Jinja environments during |
| recursion. |
| :type jinja_env: jinja2.Environment |
| :param seen_oids: template fields already rendered (to avoid RecursionError on circular dependencies) |
| :type seen_oids: set |
| :return: Templated content |
| |
| |
| |
| |
| .. method:: _render_nested_template_fields(self, content: Any, context: Dict, jinja_env: jinja2.Environment, seen_oids: Set) |
| |
| |
| |
| |
| .. method:: get_template_env(self) |
| |
| Fetch a Jinja template environment from the DAG or instantiate empty environment if no DAG. |
| |
| |
| |
| |
| .. method:: prepare_template(self) |
| |
| Hook that is triggered after the templated fields get replaced |
| by their content. If you need your operator to alter the |
| content of the file before the template is rendered, |
| it should override this method to do so. |
| |
| |
| |
| |
| .. method:: resolve_template_files(self) |
| |
| Getting the content of files for template_field / template_ext |
| |
| |
| |
| |
| .. method:: clear(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, upstream: bool = False, downstream: bool = False, session: Session = None) |
| |
| Clears the state of task instances associated with the task, following |
| the parameters specified. |
| |
| |
| |
| |
| .. method:: get_task_instances(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, session: Session = None) |
| |
| Get a set of task instance related to this task for a specific date |
| range. |
| |
| |
| |
| |
| .. method:: get_flat_relative_ids(self, upstream: bool = False, found_descendants: Optional[Set[str]] = None) |
| |
| Get a flat set of relatives' ids, either upstream or downstream. |
| |
| |
| |
| |
| .. method:: get_flat_relatives(self, upstream: bool = False) |
| |
| Get a flat list of relatives, either upstream or downstream. |
| |
| |
| |
| |
| .. method:: run(self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, ignore_first_depends_on_past: bool = True, ignore_ti_state: bool = False, mark_success: bool = False) |
| |
| Run a set of task instances for a date range. |
| |
| |
| |
| |
| .. method:: dry_run(self) |
| |
| Performs dry run for the operator - just render template fields. |
| |
| |
| |
| |
| .. method:: get_direct_relative_ids(self, upstream: bool = False) |
| |
| Get set of the direct relative ids to the current task, upstream or |
| downstream. |
| |
| |
| |
| |
| .. method:: get_direct_relatives(self, upstream: bool = False) |
| |
| Get list of the direct relatives to the current task, upstream or |
| downstream. |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: add_only_new(self, item_set: Set[str], item: str, dag_id: str) |
| |
| Adds only new items to item set |
| |
| |
| |
| |
| .. method:: _set_relatives(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], upstream: bool = False, edge_modifier: Optional[EdgeModifier] = None) |
| |
| Sets relatives for the task or task list. |
| |
| |
| |
| |
| .. method:: set_downstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], edge_modifier: Optional[EdgeModifier] = None) |
| |
| Set a task or a task list to be directly downstream from the current |
| task. Required by TaskMixin. |
| |
| |
| |
| |
| .. method:: set_upstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], edge_modifier: Optional[EdgeModifier] = None) |
| |
| Set a task or a task list to be directly upstream from the current |
| task. Required by TaskMixin. |
| |
| |
| |
| |
| .. staticmethod:: xcom_push(context: Any, key: str, value: Any, execution_date: Optional[datetime] = None) |
| |
| Make an XCom available for tasks to pull. |
| |
| :param context: Execution Context Dictionary |
| :type: Any |
| :param key: A key for the XCom |
| :type key: str |
| :param value: A value for the XCom. The value is pickled and stored |
| in the database. |
| :type value: any pickleable object |
| :param execution_date: if provided, the XCom will not be visible until |
| this date. This can be used, for example, to send a message to a |
| task on a future date without it being immediately visible. |
| :type execution_date: datetime |
| |
| |
| |
| |
| .. staticmethod:: xcom_pull(context: Any, task_ids: Optional[List[str]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: Optional[bool] = None) |
| |
| Pull XComs that optionally meet certain criteria. |
| |
| The default value for `key` limits the search to XComs |
| that were returned by other tasks (as opposed to those that were pushed |
| manually). To remove this filter, pass key=None (or any desired value). |
| |
| If a single task_id string is provided, the result is the value of the |
| most recent matching XCom from that task_id. If multiple task_ids are |
| provided, a tuple of matching values is returned. None is returned |
| whenever no matches are found. |
| |
| :param context: Execution Context Dictionary |
| :type: Any |
| :param key: A key for the XCom. If provided, only XComs with matching |
| keys will be returned. The default key is 'return_value', also |
| available as a constant XCOM_RETURN_KEY. This key is automatically |
| given to XComs returned by tasks (as opposed to being pushed |
| manually). To remove the filter, pass key=None. |
| :type key: str |
| :param task_ids: Only XComs from tasks with matching ids will be |
| pulled. Can pass None to remove the filter. |
| :type task_ids: str or iterable of strings (representing task_ids) |
| :param dag_id: If provided, only pulls XComs from this DAG. |
| If None (default), the DAG of the calling task is used. |
| :type dag_id: str |
| :param include_prior_dates: If False, only XComs from the current |
| execution_date are returned. If True, XComs from previous dates |
| are returned as well. |
| :type include_prior_dates: bool |
| |
| |
| |
| |
| .. method:: extra_links(self) |
| |
| @property: extra links for the task |
| |
| |
| |
| |
| .. method:: get_extra_links(self, dttm: datetime, link_name: str) |
| |
| For an operator, gets the URL that the external links specified in |
| `extra_links` should point to. |
| |
| :raise ValueError: The error message of a ValueError will be passed on through to |
| the fronted to show up as a tooltip on the disabled link |
| :param dttm: The datetime parsed execution date for the URL being searched for |
| :param link_name: The name of the link we're looking for the URL for. Should be |
| one of the options specified in `extra_links` |
| :return: A URL |
| |
| |
| |
| |
| .. classmethod:: get_serialized_fields(cls) |
| |
| Stringified DAGs and operators contain exactly these fields. |
| |
| |
| |
| |
| .. method:: is_smart_sensor_compatible(self) |
| |
| Return if this operator can use smart service. Default False. |
| |
| |
| |
| |
| .. py:class:: BaseOperatorLink |
| |
| Abstract base class that defines how we get an operator link. |
| |
| .. attribute:: operators |
| :annotation: :ClassVar[List[Type[BaseOperator]]] = [] |
| |
| This property will be used by Airflow Plugins to find the Operators to which you want |
| to assign this Operator Link |
| |
| :return: List of Operator classes used by task for which you want to create extra link |
| |
| |
| .. attribute:: name |
| |
| |
| Name of the link. This will be the button name on the task UI. |
| |
| :return: link name |
| |
| |
| |
| .. method:: get_link(self, operator: BaseOperator, dttm: datetime) |
| |
| Link to external system. |
| |
| :param operator: airflow operator |
| :param dttm: datetime |
| :return: link to external system |
| |
| |
| |
| |
| .. py:class:: Connection(conn_id: Optional[str] = None, conn_type: Optional[str] = None, description: Optional[str] = None, host: Optional[str] = None, login: Optional[str] = None, password: Optional[str] = None, schema: Optional[str] = None, port: Optional[int] = None, extra: Optional[Union[str, dict]] = None, uri: Optional[str] = None) |
| |
| Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Placeholder to store information about different database instances |
| connection information. The idea here is that scripts use references to |
| database instances (conn_id) instead of hard coding hostname, logins and |
| passwords when using operators or hooks. |
| |
| .. seealso:: |
| For more information on how to use this class, see: :doc:`/howto/connection` |
| |
| :param conn_id: The connection ID. |
| :type conn_id: str |
| :param conn_type: The connection type. |
| :type conn_type: str |
| :param description: The connection description. |
| :type description: str |
| :param host: The host. |
| :type host: str |
| :param login: The login. |
| :type login: str |
| :param password: The password. |
| :type password: str |
| :param schema: The schema. |
| :type schema: str |
| :param port: The port number. |
| :type port: int |
| :param extra: Extra metadata. Non-standard data such as private/SSH keys can be saved here. JSON |
| encoded object. |
| :type extra: str |
| :param uri: URI address describing connection parameters. |
| :type uri: str |
| |
| .. attribute:: EXTRA_KEY |
| :annotation: = __extra__ |
| |
| |
| |
| .. attribute:: __tablename__ |
| :annotation: = connection |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: conn_id |
| |
| |
| |
| |
| .. attribute:: conn_type |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: host |
| |
| |
| |
| |
| .. attribute:: schema |
| |
| |
| |
| |
| .. attribute:: login |
| |
| |
| |
| |
| .. attribute:: _password |
| |
| |
| |
| |
| .. attribute:: port |
| |
| |
| |
| |
| .. attribute:: is_encrypted |
| |
| |
| |
| |
| .. attribute:: is_extra_encrypted |
| |
| |
| |
| |
| .. attribute:: _extra |
| |
| |
| |
| |
| .. attribute:: password |
| |
| |
| Password. The value is decrypted/encrypted when reading/setting the value. |
| |
| |
| .. attribute:: extra |
| |
| |
| Extra data. The value is decrypted/encrypted when reading/setting the value. |
| |
| |
| .. attribute:: extra_dejson |
| |
| |
| Returns the extra property by deserializing json. |
| |
| |
| |
| .. method:: on_db_load(self) |
| |
| |
| |
| |
| .. method:: parse_from_uri(self, **uri) |
| |
| This method is deprecated. Please use uri parameter in constructor. |
| |
| |
| |
| |
| .. method:: _parse_from_uri(self, uri: str) |
| |
| |
| |
| |
| .. method:: get_uri(self) |
| |
| Return connection in URI format |
| |
| |
| |
| |
| .. method:: get_password(self) |
| |
| Return encrypted password. |
| |
| |
| |
| |
| .. method:: set_password(self, value: Optional[str]) |
| |
| Encrypt password and set in object attribute. |
| |
| |
| |
| |
| .. method:: get_extra(self) |
| |
| Return encrypted extra-data. |
| |
| |
| |
| |
| .. method:: set_extra(self, value: str) |
| |
| Encrypt extra-data and save in object attribute to object. |
| |
| |
| |
| |
| .. method:: rotate_fernet_key(self) |
| |
| Encrypts data with a new key. See: :ref:`security/fernet` |
| |
| |
| |
| |
| .. method:: get_hook(self) |
| |
| Return hook based on conn_type. |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: log_info(self) |
| |
| This method is deprecated. You can read each field individually or use the |
| default representation (`__repr__`). |
| |
| |
| |
| |
| .. method:: debug_info(self) |
| |
| This method is deprecated. You can read each field individually or use the |
| default representation (`__repr__`). |
| |
| |
| |
| |
| .. classmethod:: get_connection_from_secrets(cls, conn_id: str) |
| |
| Get connection by conn_id. |
| |
| :param conn_id: connection id |
| :return: connection |
| |
| |
| |
| |
| .. py:class:: DAG(dag_id: str, description: Optional[str] = None, schedule_interval: Optional[ScheduleInterval] = timedelta(days=1), start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, full_filepath: Optional[str] = None, template_searchpath: Optional[Union[str, Iterable[str]]] = None, template_undefined: Type[jinja2.StrictUndefined] = jinja2.StrictUndefined, user_defined_macros: Optional[Dict] = None, user_defined_filters: Optional[Dict] = None, default_args: Optional[Dict] = None, concurrency: int = conf.getint('core', 'dag_concurrency'), max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'), dagrun_timeout: Optional[timedelta] = None, sla_miss_callback: Optional[Callable[['DAG', str, str, List[str], List[TaskInstance]], None]] = None, default_view: str = conf.get('webserver', 'dag_default_view').lower(), orientation: str = conf.get('webserver', 'dag_orientation'), catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback: Optional[DagStateChangeCallback] = None, on_failure_callback: Optional[DagStateChangeCallback] = None, doc_md: Optional[str] = None, params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, jinja_environment_kwargs: Optional[Dict] = None, render_template_as_native_obj: bool = False, tags: Optional[List[str]] = None) |
| |
| Bases: :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| A dag (directed acyclic graph) is a collection of tasks with directional |
| dependencies. A dag also has a schedule, a start date and an end date |
| (optional). For each schedule, (say daily or hourly), the DAG needs to run |
| each individual tasks as their dependencies are met. Certain tasks have |
| the property of depending on their own past, meaning that they can't run |
| until their previous schedule (and upstream tasks) are completed. |
| |
| DAGs essentially act as namespaces for tasks. A task_id can only be |
| added once to a DAG. |
| |
| :param dag_id: The id of the DAG; must consist exclusively of alphanumeric |
| characters, dashes, dots and underscores (all ASCII) |
| :type dag_id: str |
| :param description: The description for the DAG to e.g. be shown on the webserver |
| :type description: str |
| :param schedule_interval: Defines how often that DAG runs, this |
| timedelta object gets added to your latest task instance's |
| execution_date to figure out the next schedule |
| :type schedule_interval: datetime.timedelta or |
| dateutil.relativedelta.relativedelta or str that acts as a cron |
| expression |
| :param start_date: The timestamp from which the scheduler will |
| attempt to backfill |
| :type start_date: datetime.datetime |
| :param end_date: A date beyond which your DAG won't run, leave to None |
| for open ended scheduling |
| :type end_date: datetime.datetime |
| :param template_searchpath: This list of folders (non relative) |
| defines where jinja will look for your templates. Order matters. |
| Note that jinja/airflow includes the path of your DAG file by |
| default |
| :type template_searchpath: str or list[str] |
| :param template_undefined: Template undefined type. |
| :type template_undefined: jinja2.StrictUndefined |
| :param user_defined_macros: a dictionary of macros that will be exposed |
| in your jinja templates. For example, passing ``dict(foo='bar')`` |
| to this argument allows you to ``{{ foo }}`` in all jinja |
| templates related to this DAG. Note that you can pass any |
| type of object here. |
| :type user_defined_macros: dict |
| :param user_defined_filters: a dictionary of filters that will be exposed |
| in your jinja templates. For example, passing |
| ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows |
| you to ``{{ 'world' | hello }}`` in all jinja templates related to |
| this DAG. |
| :type user_defined_filters: dict |
| :param default_args: A dictionary of default parameters to be used |
| as constructor keyword parameters when initialising operators. |
| Note that operators have the same hook, and precede those defined |
| here, meaning that if your dict contains `'depends_on_past': True` |
| here and `'depends_on_past': False` in the operator's call |
| `default_args`, the actual value will be `False`. |
| :type default_args: dict |
| :param params: a dictionary of DAG level parameters that are made |
| accessible in templates, namespaced under `params`. These |
| params can be overridden at the task level. |
| :type params: dict |
| :param concurrency: the number of task instances allowed to run |
| concurrently |
| :type concurrency: int |
| :param max_active_runs: maximum number of active DAG runs, beyond this |
| number of DAG runs in a running state, the scheduler won't create |
| new active DAG runs |
| :type max_active_runs: int |
| :param dagrun_timeout: specify how long a DagRun should be up before |
| timing out / failing, so that new DagRuns can be created. The timeout |
| is only enforced for scheduled DagRuns. |
| :type dagrun_timeout: datetime.timedelta |
| :param sla_miss_callback: specify a function to call when reporting SLA |
| timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for |
| more information about the function signature and parameters that are |
| passed to the callback. |
| :type sla_miss_callback: callable |
| :param default_view: Specify DAG default view (tree, graph, duration, |
| gantt, landing_times), default tree |
| :type default_view: str |
| :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR |
| :type orientation: str |
| :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True |
| :type catchup: bool |
| :param on_failure_callback: A function to be called when a DagRun of this dag fails. |
| A context dictionary is passed as a single parameter to this function. |
| :type on_failure_callback: callable |
| :param on_success_callback: Much like the ``on_failure_callback`` except |
| that it is executed when the dag succeeds. |
| :type on_success_callback: callable |
| :param access_control: Specify optional DAG-level permissions, e.g., |
| "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}" |
| :type access_control: dict |
| :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time. |
| If the dag exists already, this flag will be ignored. If this optional parameter |
| is not specified, the global config setting will be used. |
| :type is_paused_upon_creation: bool or None |
| :param jinja_environment_kwargs: additional configuration options to be passed to Jinja |
| ``Environment`` for template rendering |
| |
| **Example**: to avoid Jinja from removing a trailing newline from template strings :: |
| |
| DAG(dag_id='my-dag', |
| jinja_environment_kwargs={ |
| 'keep_trailing_newline': True, |
| # some other jinja2 Environment options here |
| } |
| ) |
| |
| **See**: `Jinja Environment documentation |
| <https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_ |
| |
| :type jinja_environment_kwargs: dict |
| :param tags: List of tags to help filtering DAGS in the UI. |
| :type tags: List[str] |
| |
| .. attribute:: _comps |
| |
| |
| |
| |
| .. attribute:: __serialized_fields |
| :annotation: :Optional[FrozenSet[str]] |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: full_filepath |
| |
| |
| |
| |
| .. attribute:: concurrency |
| |
| |
| |
| |
| .. attribute:: access_control |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: default_view |
| |
| |
| |
| |
| .. attribute:: pickle_id |
| |
| |
| |
| |
| .. attribute:: tasks |
| |
| |
| |
| |
| .. attribute:: task_ids |
| |
| |
| |
| |
| .. attribute:: task_group |
| |
| |
| |
| |
| .. attribute:: filepath |
| |
| |
| File location of where the dag object is instantiated |
| |
| |
| .. attribute:: folder |
| |
| |
| Folder location of where the DAG object is instantiated. |
| |
| |
| .. attribute:: owner |
| |
| |
| Return list of all owners found in DAG tasks. |
| |
| :return: Comma separated list of owners in DAG tasks |
| :rtype: str |
| |
| |
| .. attribute:: allow_future_exec_dates |
| |
| |
| |
| |
| .. attribute:: concurrency_reached |
| |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method. |
| |
| |
| .. attribute:: is_paused |
| |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method. |
| |
| |
| .. attribute:: normalized_schedule_interval |
| |
| |
| Returns Normalized Schedule Interval. This is used internally by the Scheduler to |
| schedule DAGs. |
| |
| 1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``) |
| 2. If Schedule Interval is "@once" return "None" |
| 3. If not (1) or (2) returns schedule_interval |
| |
| |
| .. attribute:: latest_execution_date |
| |
| |
| This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date` method. |
| |
| |
| .. attribute:: subdags |
| |
| |
| Returns a list of the subdag objects associated to this DAG |
| |
| |
| .. attribute:: roots |
| |
| |
| Return nodes with no parents. These are first to execute and are called roots or root nodes. |
| |
| |
| .. attribute:: leaves |
| |
| |
| Return nodes with no children. These are last to execute and are called leaves or leaf nodes. |
| |
| |
| .. attribute:: task |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: __eq__(self, other) |
| |
| |
| |
| |
| .. method:: __ne__(self, other) |
| |
| |
| |
| |
| .. method:: __lt__(self, other) |
| |
| |
| |
| |
| .. method:: __hash__(self) |
| |
| |
| |
| |
| .. method:: __enter__(self) |
| |
| |
| |
| |
| .. method:: __exit__(self, _type, _value, _tb) |
| |
| |
| |
| |
| .. staticmethod:: _upgrade_outdated_dag_access_control(access_control=None) |
| |
| Looks for outdated dag level permissions (can_dag_read and can_dag_edit) in DAG |
| access_controls (for example, {'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 'can_dag_edit'}}) |
| and replaces them with updated permissions (can_read and can_edit). |
| |
| |
| |
| |
| .. method:: date_range(self, start_date: datetime, num: Optional[int] = None, end_date: Optional[datetime] = timezone.utcnow()) |
| |
| |
| |
| |
| .. method:: is_fixed_time_schedule(self) |
| |
| Figures out if the DAG schedule has a fixed time (e.g. 3 AM). |
| |
| :return: True if the schedule has a fixed time, False if not. |
| |
| |
| |
| |
| .. method:: following_schedule(self, dttm) |
| |
| Calculates the following schedule for this dag in UTC. |
| |
| :param dttm: utc datetime |
| :return: utc datetime |
| |
| |
| |
| |
| .. method:: previous_schedule(self, dttm) |
| |
| Calculates the previous schedule for this dag in UTC |
| |
| :param dttm: utc datetime |
| :return: utc datetime |
| |
| |
| |
| |
| .. method:: next_dagrun_info(self, date_last_automated_dagrun: Optional[pendulum.DateTime]) |
| |
| Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the |
| execution date, and the earliest it could be scheduled |
| |
| :param date_last_automated_dagrun: The max(execution_date) of existing |
| "automated" DagRuns for this dag (scheduled or backfill, but not |
| manual) |
| |
| |
| |
| |
| .. method:: next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]) |
| |
| Get the next execution date after the given ``date_last_automated_dagrun``, according to |
| schedule_interval, start_date, end_date etc. This doesn't check max active run or any other |
| "concurrency" type limits, it only performs calculations based on the various date and interval fields |
| of this dag and it's tasks. |
| |
| :param date_last_automated_dagrun: The execution_date of the last scheduler or |
| backfill triggered run for this dag |
| :type date_last_automated_dagrun: pendulum.Pendulum |
| |
| |
| |
| |
| .. method:: get_run_dates(self, start_date, end_date=None) |
| |
| Returns a list of dates between the interval received as parameter using this |
| dag's schedule interval. Returned dates can be used for execution dates. |
| |
| :param start_date: the start date of the interval |
| :type start_date: datetime |
| :param end_date: the end date of the interval, defaults to timezone.utcnow() |
| :type end_date: datetime |
| :return: a list of dates within the interval following the dag's schedule |
| :rtype: list |
| |
| |
| |
| |
| .. method:: normalize_schedule(self, dttm) |
| |
| Returns dttm + interval unless dttm is first interval then it returns dttm |
| |
| |
| |
| |
| .. method:: get_last_dagrun(self, session=None, include_externally_triggered=False) |
| |
| |
| |
| |
| .. method:: has_dag_runs(self, session=None, include_externally_triggered=True) |
| |
| |
| |
| |
| .. method:: param(self, name: str, default=None) |
| |
| Return a DagParam object for current dag. |
| |
| :param name: dag parameter name. |
| :param default: fallback value for dag parameter. |
| :return: DagParam instance for specified name and current dag. |
| |
| |
| |
| |
| .. method:: get_concurrency_reached(self, session=None) |
| |
| Returns a boolean indicating whether the concurrency limit for this DAG |
| has been reached |
| |
| |
| |
| |
| .. method:: get_is_active(self, session=None) |
| |
| Returns a boolean indicating whether this DAG is active |
| |
| |
| |
| |
| .. method:: get_is_paused(self, session=None) |
| |
| Returns a boolean indicating whether this DAG is paused |
| |
| |
| |
| |
| .. method:: handle_callback(self, dagrun, success=True, reason=None, session=None) |
| |
| Triggers the appropriate callback depending on the value of success, namely the |
| on_failure_callback or on_success_callback. This method gets the context of a |
| single TaskInstance part of this DagRun and passes that to the callable along |
| with a 'reason', primarily to differentiate DagRun failures. |
| |
| .. note: The logs end up in |
| ``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log`` |
| |
| :param dagrun: DagRun object |
| :param success: Flag to specify if failure or success callback should be called |
| :param reason: Completion reason |
| :param session: Database session |
| |
| |
| |
| |
| .. method:: get_active_runs(self) |
| |
| Returns a list of dag run execution dates currently running |
| |
| :return: List of execution dates |
| |
| |
| |
| |
| .. method:: get_num_active_runs(self, external_trigger=None, session=None) |
| |
| Returns the number of active "running" dag runs |
| |
| :param external_trigger: True for externally triggered active dag runs |
| :type external_trigger: bool |
| :param session: |
| :return: number greater than 0 for active dag runs |
| |
| |
| |
| |
| .. method:: get_dagrun(self, execution_date, session=None) |
| |
| Returns the dag run for a given execution date if it exists, otherwise |
| none. |
| |
| :param execution_date: The execution date of the DagRun to find. |
| :param session: |
| :return: The DagRun if found, otherwise None. |
| |
| |
| |
| |
| .. method:: get_dagruns_between(self, start_date, end_date, session=None) |
| |
| Returns the list of dag runs between start_date (inclusive) and end_date (inclusive). |
| |
| :param start_date: The starting execution date of the DagRun to find. |
| :param end_date: The ending execution date of the DagRun to find. |
| :param session: |
| :return: The list of DagRuns found. |
| |
| |
| |
| |
| .. method:: get_latest_execution_date(self, session=None) |
| |
| Returns the latest date for which at least one dag run exists |
| |
| |
| |
| |
| .. method:: resolve_template_files(self) |
| |
| |
| |
| |
| .. method:: get_template_env(self) |
| |
| Build a Jinja2 environment. |
| |
| |
| |
| |
| .. method:: set_dependency(self, upstream_task_id, downstream_task_id) |
| |
| Simple utility method to set dependency between two tasks that |
| already have been added to the DAG using add_task() |
| |
| |
| |
| |
| .. method:: get_task_instances(self, start_date=None, end_date=None, state=None, session=None) |
| |
| |
| |
| |
| .. method:: topological_sort(self, include_subdag_tasks: bool = False) |
| |
| Sorts tasks in topographical order, such that a task comes after any of its |
| upstream dependencies. |
| |
| Heavily inspired by: |
| http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/ |
| |
| :param include_subdag_tasks: whether to include tasks in subdags, default to False |
| :return: list of tasks in topological order |
| |
| |
| |
| |
| .. method:: set_dag_runs_state(self, state: str = State.RUNNING, session: Session = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, dag_ids: List[str] = None) |
| |
| |
| |
| |
| .. method:: clear(self, task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run=False, session=None, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, visited_external_tis=None) |
| |
| Clears a set of task instances associated with the current dag for |
| a specified date range. |
| |
| :param task_ids: List of task ids to clear |
| :type task_ids: List[str] |
| :param start_date: The minimum execution_date to clear |
| :type start_date: datetime.datetime or None |
| :param end_date: The maximum execution_date to clear |
| :type end_date: datetime.datetime or None |
| :param only_failed: Only clear failed tasks |
| :type only_failed: bool |
| :param only_running: Only clear running tasks. |
| :type only_running: bool |
| :param confirm_prompt: Ask for confirmation |
| :type confirm_prompt: bool |
| :param include_subdags: Clear tasks in subdags and clear external tasks |
| indicated by ExternalTaskMarker |
| :type include_subdags: bool |
| :param include_parentdag: Clear tasks in the parent dag of the subdag. |
| :type include_parentdag: bool |
| :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not |
| be changed. |
| :param dry_run: Find the tasks to clear but don't clear them. |
| :type dry_run: bool |
| :param session: The sqlalchemy session to use |
| :type session: sqlalchemy.orm.session.Session |
| :param get_tis: Return the sqlalchemy query for finding the TaskInstance without clearing the tasks |
| :type get_tis: bool |
| :param recursion_depth: The recursion depth of nested calls to DAG.clear(). |
| :type recursion_depth: int |
| :param max_recursion_depth: The maximum recursion depth allowed. This is determined by the |
| first encountered ExternalTaskMarker. Default is None indicating no ExternalTaskMarker |
| has been encountered. |
| :type max_recursion_depth: int |
| :param dag_bag: The DagBag used to find the dags |
| :type dag_bag: airflow.models.dagbag.DagBag |
| :param visited_external_tis: A set used internally to keep track of the visited TaskInstance when |
| clearing tasks across multiple DAGs linked by ExternalTaskMarker to avoid redundant work. |
| :type visited_external_tis: set |
| |
| |
| |
| |
| .. classmethod:: clear_dags(cls, dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False) |
| |
| |
| |
| |
| .. method:: __deepcopy__(self, memo) |
| |
| |
| |
| |
| .. method:: sub_dag(self, *args, **kwargs) |
| |
| This method is deprecated in favor of partial_subset |
| |
| |
| |
| |
| .. method:: partial_subset(self, task_ids_or_regex: Union[str, RePatternType, Iterable[str]], include_downstream=False, include_upstream=True, include_direct_upstream=False) |
| |
| Returns a subset of the current dag as a deep copy of the current dag |
| based on a regex that should match one or many tasks, and includes |
| upstream and downstream neighbours based on the flag passed. |
| |
| :param task_ids_or_regex: Either a list of task_ids, or a regex to |
| match against task ids (as a string, or compiled regex pattern). |
| :type task_ids_or_regex: [str] or str or re.Pattern |
| :param include_downstream: Include all downstream tasks of matched |
| tasks, in addition to matched tasks. |
| :param include_upstream: Include all upstream tasks of matched tasks, |
| in addition to matched tasks. |
| |
| |
| |
| |
| .. method:: has_task(self, task_id: str) |
| |
| |
| |
| |
| .. method:: get_task(self, task_id: str, include_subdags: bool = False) |
| |
| |
| |
| |
| .. method:: pickle_info(self) |
| |
| |
| |
| |
| .. method:: pickle(self, session=None) |
| |
| |
| |
| |
| .. method:: tree_view(self) |
| |
| Print an ASCII tree representation of the DAG. |
| |
| |
| |
| |
| .. method:: add_task(self, task) |
| |
| Add a task to the DAG |
| |
| :param task: the task you want to add |
| :type task: task |
| |
| |
| |
| |
| .. method:: add_tasks(self, tasks) |
| |
| Add a list of tasks to the DAG |
| |
| :param tasks: a lit of tasks you want to add |
| :type tasks: list of tasks |
| |
| |
| |
| |
| .. method:: run(self, start_date=None, end_date=None, mark_success=False, local=False, executor=None, donot_pickle=conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False) |
| |
| Runs the DAG. |
| |
| :param start_date: the start date of the range to run |
| :type start_date: datetime.datetime |
| :param end_date: the end date of the range to run |
| :type end_date: datetime.datetime |
| :param mark_success: True to mark jobs as succeeded without running them |
| :type mark_success: bool |
| :param local: True to run the tasks using the LocalExecutor |
| :type local: bool |
| :param executor: The executor instance to run the tasks |
| :type executor: airflow.executor.base_executor.BaseExecutor |
| :param donot_pickle: True to avoid pickling DAG object and send to workers |
| :type donot_pickle: bool |
| :param ignore_task_deps: True to skip upstream tasks |
| :type ignore_task_deps: bool |
| :param ignore_first_depends_on_past: True to ignore depends_on_past |
| dependencies for the first set of tasks only |
| :type ignore_first_depends_on_past: bool |
| :param pool: Resource pool to use |
| :type pool: str |
| :param delay_on_limit_secs: Time in seconds to wait before next attempt to run |
| dag run when max_active_runs limit has been reached |
| :type delay_on_limit_secs: float |
| :param verbose: Make logging output more verbose |
| :type verbose: bool |
| :param conf: user defined dictionary passed from CLI |
| :type conf: dict |
| :param rerun_failed_tasks: |
| :type: bool |
| :param run_backwards: |
| :type: bool |
| |
| |
| |
| |
| .. method:: cli(self) |
| |
| Exposes a CLI specific to this DAG |
| |
| |
| |
| |
| .. method:: create_dagrun(self, state: DagRunState, execution_date: Optional[datetime] = None, run_id: Optional[str] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = False, conf: Optional[dict] = None, run_type: Optional[DagRunType] = None, session=None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None) |
| |
| Creates a dag run from this dag including the tasks associated with this dag. |
| Returns the dag run. |
| |
| :param run_id: defines the run id for this dag run |
| :type run_id: str |
| :param run_type: type of DagRun |
| :type run_type: airflow.utils.types.DagRunType |
| :param execution_date: the execution date of this dag run |
| :type execution_date: datetime.datetime |
| :param state: the state of the dag run |
| :type state: airflow.utils.state.DagRunState |
| :param start_date: the date this dag run should be evaluated |
| :type start_date: datetime |
| :param external_trigger: whether this dag run is externally triggered |
| :type external_trigger: bool |
| :param conf: Dict containing configuration/parameters to pass to the DAG |
| :type conf: dict |
| :param creating_job_id: id of the job creating this DagRun |
| :type creating_job_id: int |
| :param session: database session |
| :type session: sqlalchemy.orm.session.Session |
| :param dag_hash: Hash of Serialized DAG |
| :type dag_hash: str |
| |
| |
| |
| |
| .. classmethod:: bulk_sync_to_db(cls, dags: Collection['DAG'], session=None) |
| |
| This method is deprecated in favor of bulk_write_to_db |
| |
| |
| |
| |
| .. classmethod:: bulk_write_to_db(cls, dags: Collection['DAG'], session=None) |
| |
| Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB, including |
| calculated fields. |
| |
| Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator. |
| |
| :param dags: the DAG objects to save to the DB |
| :type dags: List[airflow.models.dag.DAG] |
| :return: None |
| |
| |
| |
| |
| .. method:: sync_to_db(self, session=None) |
| |
| Save attributes about this DAG to the DB. Note that this method |
| can be called for both DAGs and SubDAGs. A SubDag is actually a |
| SubDagOperator. |
| |
| :return: None |
| |
| |
| |
| |
| .. method:: get_default_view(self) |
| |
| This is only there for backward compatible jinja2 templates |
| |
| |
| |
| |
| .. staticmethod:: deactivate_unknown_dags(active_dag_ids, session=None) |
| |
| Given a list of known DAGs, deactivate any other DAGs that are |
| marked as active in the ORM |
| |
| :param active_dag_ids: list of DAG IDs that are active |
| :type active_dag_ids: list[unicode] |
| :return: None |
| |
| |
| |
| |
| .. staticmethod:: deactivate_stale_dags(expiration_date, session=None) |
| |
| Deactivate any DAGs that were last touched by the scheduler before |
| the expiration date. These DAGs were likely deleted. |
| |
| :param expiration_date: set inactive DAGs that were touched before this |
| time |
| :type expiration_date: datetime |
| :return: None |
| |
| |
| |
| |
| .. staticmethod:: get_num_task_instances(dag_id, task_ids=None, states=None, session=None) |
| |
| Returns the number of task instances in the given DAG. |
| |
| :param session: ORM session |
| :param dag_id: ID of the DAG to get the task concurrency of |
| :type dag_id: unicode |
| :param task_ids: A list of valid task IDs for the given DAG |
| :type task_ids: list[unicode] |
| :param states: A list of states to filter by if supplied |
| :type states: list[state] |
| :return: The number of running tasks |
| :rtype: int |
| |
| |
| |
| |
| .. classmethod:: get_serialized_fields(cls) |
| |
| Stringified DAGs and operators contain exactly these fields. |
| |
| |
| |
| |
| .. method:: get_edge_info(self, upstream_task_id: str, downstream_task_id: str) |
| |
| Returns edge information for the given pair of tasks if present, and |
| None if there is no information. |
| |
| |
| |
| |
| .. method:: set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: EdgeInfoType) |
| |
| Sets the given edge information on the DAG. Note that this will overwrite, |
| rather than merge with, existing info. |
| |
| |
| |
| |
| .. py:class:: DagModel(**kwargs) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| Table containing DAG properties |
| |
| .. attribute:: __tablename__ |
| :annotation: = dag |
| |
| These items are stored in the database for state related information |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: root_dag_id |
| |
| |
| |
| |
| .. attribute:: is_paused_at_creation |
| |
| |
| |
| |
| .. attribute:: is_paused |
| |
| |
| |
| |
| .. attribute:: is_subdag |
| |
| |
| |
| |
| .. attribute:: is_active |
| |
| |
| |
| |
| .. attribute:: last_parsed_time |
| |
| |
| |
| |
| .. attribute:: last_pickled |
| |
| |
| |
| |
| .. attribute:: last_expired |
| |
| |
| |
| |
| .. attribute:: scheduler_lock |
| |
| |
| |
| |
| .. attribute:: pickle_id |
| |
| |
| |
| |
| .. attribute:: fileloc |
| |
| |
| |
| |
| .. attribute:: owners |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: default_view |
| |
| |
| |
| |
| .. attribute:: schedule_interval |
| |
| |
| |
| |
| .. attribute:: tags |
| |
| |
| |
| |
| .. attribute:: concurrency |
| |
| |
| |
| |
| .. attribute:: max_active_runs |
| |
| |
| |
| |
| .. attribute:: has_task_concurrency_limits |
| |
| |
| |
| |
| .. attribute:: next_dagrun |
| |
| |
| |
| |
| .. attribute:: next_dagrun_create_after |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| .. attribute:: NUM_DAGS_PER_DAGRUN_QUERY |
| |
| |
| |
| |
| .. attribute:: timezone |
| |
| |
| |
| |
| .. attribute:: safe_dag_id |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. staticmethod:: get_dagmodel(dag_id, session=None) |
| |
| |
| |
| |
| .. classmethod:: get_current(cls, dag_id, session=None) |
| |
| |
| |
| |
| .. method:: get_last_dagrun(self, session=None, include_externally_triggered=False) |
| |
| |
| |
| |
| .. staticmethod:: get_paused_dag_ids(dag_ids: List[str], session: Session = None) |
| |
| Given a list of dag_ids, get a set of Paused Dag Ids |
| |
| :param dag_ids: List of Dag ids |
| :param session: ORM Session |
| :return: Paused Dag_ids |
| |
| |
| |
| |
| .. method:: get_default_view(self) |
| |
| Get the Default DAG View, returns the default config value if DagModel does not |
| have a value |
| |
| |
| |
| |
| .. method:: set_is_paused(self, is_paused: bool, including_subdags: bool = True, session=None) |
| |
| Pause/Un-pause a DAG. |
| |
| :param is_paused: Is the DAG paused |
| :param including_subdags: whether to include the DAG's subdags |
| :param session: session |
| |
| |
| |
| |
| .. classmethod:: deactivate_deleted_dags(cls, alive_dag_filelocs: List[str], session=None) |
| |
| Set ``is_active=False`` on the DAGs for which the DAG files have been removed. |
| Additionally change ``is_active=False`` to ``True`` if the DAG file exists. |
| |
| :param alive_dag_filelocs: file paths of alive DAGs |
| :param session: ORM Session |
| |
| |
| |
| |
| .. classmethod:: dags_needing_dagruns(cls, session: Session) |
| |
| Return (and lock) a list of Dag objects that are due to create a new DagRun. |
| |
| This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, |
| you should ensure that any scheduling decisions are made in a single transaction -- as soon as the |
| transaction is committed it will be unlocked. |
| |
| |
| |
| |
| .. method:: calculate_dagrun_date_fields(self, dag: DAG, most_recent_dag_run: Optional[pendulum.DateTime]) |
| |
| Calculate ``next_dagrun`` and `next_dagrun_create_after`` |
| |
| :param dag: The DAG object |
| :param most_recent_dag_run: DateTime of most recent run of this dag, or none if not yet scheduled. |
| |
| |
| |
| |
| .. py:class:: DagTag |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| A tag name per dag, to allow quick filtering in the DAG view. |
| |
| .. attribute:: __tablename__ |
| :annotation: = dag_tag |
| |
| |
| |
| .. attribute:: name |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. py:class:: DagBag(dag_folder: Union[str, 'pathlib.Path', None] = None, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), read_dags_from_db: bool = False, store_serialized_dags: Optional[bool] = None, load_op_links: bool = True) |
| |
| Bases: :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| A dagbag is a collection of dags, parsed out of a folder tree and has high |
| level configuration settings, like what database to use as a backend and |
| what executor to use to fire off tasks. This makes it easier to run |
| distinct environments for say production and development, tests, or for |
| different teams or security profiles. What would have been system level |
| settings are now dagbag level so that one system can run multiple, |
| independent settings sets. |
| |
| :param dag_folder: the folder to scan to find DAGs |
| :type dag_folder: unicode |
| :param include_examples: whether to include the examples that ship |
| with airflow or not |
| :type include_examples: bool |
| :param include_smart_sensor: whether to include the smart sensor native |
| DAGs that create the smart sensor operators for whole cluster |
| :type include_smart_sensor: bool |
| :param read_dags_from_db: Read DAGs from DB if ``True`` is passed. |
| If ``False`` DAGs are read from python files. |
| :type read_dags_from_db: bool |
| :param load_op_links: Should the extra operator link be loaded via plugins when |
| de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links |
| are not loaded to not run User code in Scheduler. |
| :type load_op_links: bool |
| |
| .. attribute:: DAGBAG_IMPORT_TIMEOUT |
| |
| |
| |
| |
| .. attribute:: SCHEDULER_ZOMBIE_TASK_THRESHOLD |
| |
| |
| |
| |
| .. attribute:: store_serialized_dags |
| |
| |
| Whether or not to read dags from DB |
| |
| |
| .. attribute:: dag_ids |
| |
| |
| :return: a list of DAG IDs in this bag |
| :rtype: List[unicode] |
| |
| |
| |
| .. method:: size(self) |
| |
| :return: the amount of dags contained in this dagbag |
| |
| |
| |
| |
| .. method:: get_dag(self, dag_id, session: Session = None) |
| |
| Gets the DAG out of the dictionary, and refreshes it if expired |
| |
| :param dag_id: DAG Id |
| :type dag_id: str |
| |
| |
| |
| |
| .. method:: _add_dag_from_db(self, dag_id: str, session: Session) |
| |
| Add DAG to DagBag from DB |
| |
| |
| |
| |
| .. method:: process_file(self, filepath, only_if_updated=True, safe_mode=True) |
| |
| Given a path to a python module or zip file, this method imports |
| the module and look for dag objects within it. |
| |
| |
| |
| |
| .. method:: _load_modules_from_file(self, filepath, safe_mode) |
| |
| |
| |
| |
| .. method:: _load_modules_from_zip(self, filepath, safe_mode) |
| |
| |
| |
| |
| .. method:: _process_modules(self, filepath, mods, file_last_changed_on_disk) |
| |
| |
| |
| |
| .. method:: bag_dag(self, dag, root_dag) |
| |
| Adds the DAG into the bag, recurses into sub dags. |
| |
| :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags. |
| :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag. |
| |
| |
| |
| |
| .. method:: _bag_dag(self, *, dag, root_dag, recursive) |
| |
| Actual implementation of bagging a dag. |
| |
| The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``, |
| intended to only be used by the ``_bag_dag()`` implementation. |
| |
| |
| |
| |
| .. method:: collect_dags(self, dag_folder: Union[str, 'pathlib.Path', None] = None, only_if_updated: bool = True, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')) |
| |
| Given a file path or a folder, this method looks for python modules, |
| imports them and adds them to the dagbag collection. |
| |
| Note that if a ``.airflowignore`` file is found while processing |
| the directory, it will behave much like a ``.gitignore``, |
| ignoring files that match any of the regex patterns specified |
| in the file. |
| |
| **Note**: The patterns in .airflowignore are treated as |
| un-anchored regexes, not shell-like glob patterns. |
| |
| |
| |
| |
| .. method:: collect_dags_from_db(self) |
| |
| Collects DAGs from database. |
| |
| |
| |
| |
| .. method:: dagbag_report(self) |
| |
| Prints a report around DagBag loading stats |
| |
| |
| |
| |
| .. method:: sync_to_db(self, session: Optional[Session] = None) |
| |
| Save attributes about list of DAG to the DB. |
| |
| |
| |
| |
| .. method:: _sync_perm_for_dag(self, dag, session: Optional[Session] = None) |
| |
| Sync DAG specific permissions, if necessary |
| |
| |
| |
| |
| .. py:class:: DagPickle(dag) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| Dags can originate from different places (user repos, main repo, ...) |
| and also get executed in different places (different executors). This |
| object represents a version of a DAG and becomes a source of truth for |
| a BackfillJob execution. A pickle is a native python serialized object, |
| and in this case gets stored in the database for the duration of the job. |
| |
| The executors pick up the DagPickle id and read the dag definition from |
| the database. |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: pickle |
| |
| |
| |
| |
| .. attribute:: created_dttm |
| |
| |
| |
| |
| .. attribute:: pickle_hash |
| |
| |
| |
| |
| .. attribute:: __tablename__ |
| :annotation: = dag_pickle |
| |
| |
| |
| |
| .. py:class:: DagRun(dag_id: Optional[str] = None, run_id: Optional[str] = None, queued_at: Optional[datetime] = __NO_VALUE, execution_date: Optional[datetime] = None, start_date: Optional[datetime] = None, external_trigger: Optional[bool] = None, conf: Optional[Any] = None, state: Optional[DagRunState] = None, run_type: Optional[str] = None, dag_hash: Optional[str] = None, creating_job_id: Optional[int] = None) |
| |
| Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| DagRun describes an instance of a Dag. It can be created |
| by the scheduler (for regular runs) or by an external trigger |
| |
| .. attribute:: __tablename__ |
| :annotation: = dag_run |
| |
| |
| |
| .. attribute:: __NO_VALUE |
| |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: queued_at |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: start_date |
| |
| |
| |
| |
| .. attribute:: end_date |
| |
| |
| |
| |
| .. attribute:: _state |
| |
| |
| |
| |
| .. attribute:: run_id |
| |
| |
| |
| |
| .. attribute:: creating_job_id |
| |
| |
| |
| |
| .. attribute:: external_trigger |
| |
| |
| |
| |
| .. attribute:: run_type |
| |
| |
| |
| |
| .. attribute:: conf |
| |
| |
| |
| |
| .. attribute:: last_scheduling_decision |
| |
| |
| |
| |
| .. attribute:: dag_hash |
| |
| |
| |
| |
| .. attribute:: dag |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| .. attribute:: task_instances |
| |
| |
| |
| |
| .. attribute:: DEFAULT_DAGRUNS_TO_EXAMINE |
| |
| |
| |
| |
| .. attribute:: state |
| |
| |
| |
| |
| .. attribute:: is_backfill |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: get_state(self) |
| |
| |
| |
| |
| .. method:: set_state(self, state: DagRunState) |
| |
| |
| |
| |
| .. method:: refresh_from_db(self, session: Session = None) |
| |
| Reloads the current dagrun from the database |
| |
| :param session: database session |
| :type session: Session |
| |
| |
| |
| |
| .. classmethod:: next_dagruns_to_examine(cls, state: DagRunState, session: Session, max_number: Optional[int] = None) |
| |
| Return the next DagRuns that the scheduler should attempt to schedule. |
| |
| This will return zero or more DagRun rows that are row-level-locked with a "SELECT ... FOR UPDATE" |
| query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as |
| the transaction is committed it will be unlocked. |
| |
| :rtype: list[airflow.models.DagRun] |
| |
| |
| |
| |
| .. staticmethod:: find(dag_id: Optional[Union[str, List[str]]] = None, run_id: Optional[str] = None, execution_date: Optional[datetime] = None, state: Optional[DagRunState] = None, external_trigger: Optional[bool] = None, no_backfills: bool = False, run_type: Optional[DagRunType] = None, session: Session = None, execution_start_date: Optional[datetime] = None, execution_end_date: Optional[datetime] = None) |
| |
| Returns a set of dag runs for the given search criteria. |
| |
| :param dag_id: the dag_id or list of dag_id to find dag runs for |
| :type dag_id: str or list[str] |
| :param run_id: defines the run id for this dag run |
| :type run_id: str |
| :param run_type: type of DagRun |
| :type run_type: airflow.utils.types.DagRunType |
| :param execution_date: the execution date |
| :type execution_date: datetime.datetime or list[datetime.datetime] |
| :param state: the state of the dag run |
| :type state: DagRunState |
| :param external_trigger: whether this dag run is externally triggered |
| :type external_trigger: bool |
| :param no_backfills: return no backfills (True), return all (False). |
| Defaults to False |
| :type no_backfills: bool |
| :param session: database session |
| :type session: sqlalchemy.orm.session.Session |
| :param execution_start_date: dag run that was executed from this date |
| :type execution_start_date: datetime.datetime |
| :param execution_end_date: dag run that was executed until this date |
| :type execution_end_date: datetime.datetime |
| |
| |
| |
| |
| .. staticmethod:: generate_run_id(run_type: DagRunType, execution_date: datetime) |
| |
| Generate Run ID based on Run Type and Execution Date |
| |
| |
| |
| |
| .. method:: get_task_instances(self, state: Optional[Iterable[TaskInstanceState]] = None, session=None) |
| |
| Returns the task instances for this dag run |
| |
| |
| |
| |
| .. method:: get_task_instance(self, task_id: str, session: Session = None) |
| |
| Returns the task instance specified by task_id for this dag run |
| |
| :param task_id: the task id |
| :type task_id: str |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: get_dag(self) |
| |
| Returns the Dag associated with this DagRun. |
| |
| :return: DAG |
| |
| |
| |
| |
| .. method:: get_previous_dagrun(self, state: Optional[DagRunState] = None, session: Session = None) |
| |
| The previous DagRun, if there is one |
| |
| |
| |
| |
| .. method:: get_previous_scheduled_dagrun(self, session: Session = None) |
| |
| The previous, SCHEDULED DagRun, if there is one |
| |
| |
| |
| |
| .. method:: update_state(self, session: Session = None, execute_callbacks: bool = True) |
| |
| Determines the overall state of the DagRun based on the state |
| of its TaskInstances. |
| |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| :param execute_callbacks: Should dag callbacks (success/failure, SLA etc) be invoked |
| directly (default: true) or recorded as a pending request in the ``callback`` property |
| :type execute_callbacks: bool |
| :return: Tuple containing tis that can be scheduled in the current loop & `callback` that |
| needs to be executed |
| |
| |
| |
| |
| .. method:: task_instance_scheduling_decisions(self, session: Session = None) |
| |
| |
| |
| |
| .. method:: _get_ready_tis(self, scheduleable_tasks: List[TI], finished_tasks: List[TI], session: Session) |
| |
| |
| |
| |
| .. method:: _are_premature_tis(self, unfinished_tasks: List[TI], finished_tasks: List[TI], session: Session) |
| |
| |
| |
| |
| .. method:: _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis) |
| |
| This is a helper method to emit the true scheduling delay stats, which is defined as |
| the time when the first task in DAG starts minus the expected DAG run datetime. |
| This method will be used in the update_state method when the state of the DagRun |
| is updated to a completed status (either success or failure). The method will find the first |
| started task within the DAG and calculate the expected DagRun start time (based on |
| dag.execution_date & dag.schedule_interval), and minus these two values to get the delay. |
| The emitted data may contains outlier (e.g. when the first task was cleared, so |
| the second task's start_date will be used), but we can get rid of the outliers |
| on the stats side through the dashboards tooling built. |
| Note, the stat will only be emitted if the DagRun is a scheduler triggered one |
| (i.e. external_trigger is False). |
| |
| |
| |
| |
| .. method:: _emit_duration_stats_for_finished_state(self) |
| |
| |
| |
| |
| .. method:: verify_integrity(self, session: Session = None) |
| |
| Verifies the DagRun by checking for removed tasks or tasks that are not in the |
| database yet. It will set state to removed or add the task if required. |
| |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. staticmethod:: get_run(session: Session, dag_id: str, execution_date: datetime) |
| |
| Get a single DAG Run |
| |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| :param dag_id: DAG ID |
| :type dag_id: unicode |
| :param execution_date: execution date |
| :type execution_date: datetime |
| :return: DagRun corresponding to the given dag_id and execution date |
| if one exists. None otherwise. |
| :rtype: airflow.models.DagRun |
| |
| |
| |
| |
| .. classmethod:: get_latest_runs(cls, session=None) |
| |
| Returns the latest DagRun for each DAG |
| |
| |
| |
| |
| .. method:: schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) |
| |
| Set the given task instances in to the scheduled state. |
| |
| Each element of ``schedulable_tis`` should have it's ``task`` attribute already set. |
| |
| Any DummyOperator without callbacks is instead set straight to the success state. |
| |
| All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked -- it |
| is the caller's responsibility to call this function only with TIs from a single dag run. |
| |
| |
| |
| |
| .. py:class:: ImportError |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| A table to store all Import Errors. The ImportErrors are recorded when parsing DAGs. |
| This errors are displayed on the Webserver. |
| |
| .. attribute:: __tablename__ |
| :annotation: = import_error |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: timestamp |
| |
| |
| |
| |
| .. attribute:: filename |
| |
| |
| |
| |
| .. attribute:: stacktrace |
| |
| |
| |
| |
| |
| .. py:class:: Log(event, task_instance=None, owner=None, extra=None, **kwargs) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| Used to actively log events to the database |
| |
| .. attribute:: __tablename__ |
| :annotation: = log |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: dttm |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: event |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: owner |
| |
| |
| |
| |
| .. attribute:: extra |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| |
| .. py:class:: Pool |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| the class to get Pool info. |
| |
| .. attribute:: __tablename__ |
| :annotation: = slot_pool |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: pool |
| |
| |
| |
| |
| .. attribute:: slots |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: DEFAULT_POOL_NAME |
| :annotation: = default_pool |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. staticmethod:: get_pool(pool_name, session: Session = None) |
| |
| Get the Pool with specific pool name from the Pools. |
| |
| :param pool_name: The pool name of the Pool to get. |
| :param session: SQLAlchemy ORM Session |
| :return: the pool object |
| |
| |
| |
| |
| .. staticmethod:: get_default_pool(session: Session = None) |
| |
| Get the Pool of the default_pool from the Pools. |
| |
| :param session: SQLAlchemy ORM Session |
| :return: the pool object |
| |
| |
| |
| |
| .. staticmethod:: slots_stats(*, lock_rows: bool = False, session: Session = None) |
| |
| Get Pool stats (Number of Running, Queued, Open & Total tasks) |
| |
| If ``lock_rows`` is True, and the database engine in use supports the ``NOWAIT`` syntax, then a |
| non-blocking lock will be attempted -- if the lock is not available then SQLAlchemy will throw an |
| OperationalError. |
| |
| :param lock_rows: Should we attempt to obtain a row-level lock on all the Pool rows returns |
| :param session: SQLAlchemy ORM Session |
| |
| |
| |
| |
| .. method:: to_json(self) |
| |
| Get the Pool in a json structure |
| |
| :return: the pool object in json format |
| |
| |
| |
| |
| .. method:: occupied_slots(self, session: Session) |
| |
| Get the number of slots used by running/queued tasks at the moment. |
| |
| :param session: SQLAlchemy ORM Session |
| :return: the used number of slots |
| |
| |
| |
| |
| .. method:: running_slots(self, session: Session) |
| |
| Get the number of slots used by running tasks at the moment. |
| |
| :param session: SQLAlchemy ORM Session |
| :return: the used number of slots |
| |
| |
| |
| |
| .. method:: queued_slots(self, session: Session) |
| |
| Get the number of slots used by queued tasks at the moment. |
| |
| :param session: SQLAlchemy ORM Session |
| :return: the used number of slots |
| |
| |
| |
| |
| .. method:: open_slots(self, session: Session) |
| |
| Get the number of slots open at the moment. |
| |
| :param session: SQLAlchemy ORM Session |
| :return: the number of slots |
| |
| |
| |
| |
| .. py:class:: RenderedTaskInstanceFields(ti: TaskInstance, render_templates=True) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| Save Rendered Template Fields |
| |
| .. attribute:: __tablename__ |
| :annotation: = rendered_task_instance_fields |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: rendered_fields |
| |
| |
| |
| |
| .. attribute:: k8s_pod_yaml |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: _redact(self) |
| |
| |
| |
| |
| .. classmethod:: get_templated_fields(cls, ti: TaskInstance, session: Session = None) |
| |
| Get templated field for a TaskInstance from the RenderedTaskInstanceFields |
| table. |
| |
| :param ti: Task Instance |
| :param session: SqlAlchemy Session |
| :return: Rendered Templated TI field |
| |
| |
| |
| |
| .. classmethod:: get_k8s_pod_yaml(cls, ti: TaskInstance, session: Session = None) |
| |
| Get rendered Kubernetes Pod Yaml for a TaskInstance from the RenderedTaskInstanceFields |
| table. |
| |
| :param ti: Task Instance |
| :param session: SqlAlchemy Session |
| :return: Kubernetes Pod Yaml |
| |
| |
| |
| |
| .. method:: write(self, session: Session = None) |
| |
| Write instance to database |
| |
| :param session: SqlAlchemy Session |
| |
| |
| |
| |
| .. classmethod:: delete_old_records(cls, task_id: str, dag_id: str, num_to_keep=conf.getint('core', 'max_num_rendered_ti_fields_per_task', fallback=0), session: Session = None) |
| |
| Keep only Last X (num_to_keep) number of records for a task by deleting others |
| |
| :param task_id: Task ID |
| :param dag_id: Dag ID |
| :param num_to_keep: Number of Records to keep |
| :param session: SqlAlchemy Session |
| |
| |
| |
| |
| .. py:class:: SensorInstance(ti) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| SensorInstance support the smart sensor service. It stores the sensor task states |
| and context that required for poking include poke context and execution context. |
| In sensor_instance table we also save the sensor operator classpath so that inside |
| smart sensor there is no need to import the dagbag and create task object for each |
| sensor task. |
| |
| SensorInstance include another set of columns to support the smart sensor shard on |
| large number of sensor instance. The key idea is to generate the hash code from the |
| poke context and use it to map to a shorter shard code which can be used as an index. |
| Every smart sensor process takes care of tasks whose `shardcode` are in a certain range. |
| |
| .. attribute:: __tablename__ |
| :annotation: = sensor_instance |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: state |
| |
| |
| |
| |
| .. attribute:: _try_number |
| |
| |
| |
| |
| .. attribute:: start_date |
| |
| |
| |
| |
| .. attribute:: operator |
| |
| |
| |
| |
| .. attribute:: op_classpath |
| |
| |
| |
| |
| .. attribute:: hashcode |
| |
| |
| |
| |
| .. attribute:: shardcode |
| |
| |
| |
| |
| .. attribute:: poke_context |
| |
| |
| |
| |
| .. attribute:: execution_context |
| |
| |
| |
| |
| .. attribute:: created_at |
| |
| |
| |
| |
| .. attribute:: updated_at |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| .. attribute:: try_number |
| |
| |
| Return the try number that this task number will be when it is actually |
| run. |
| If the TI is currently running, this will match the column in the |
| database, in all other cases this will be incremented. |
| |
| |
| |
| .. staticmethod:: get_classpath(obj) |
| |
| Get the object dotted class path. Used for getting operator classpath. |
| |
| :param obj: |
| :type obj: |
| :return: The class path of input object |
| :rtype: str |
| |
| |
| |
| |
| .. classmethod:: register(cls, ti, poke_context, execution_context, session=None) |
| |
| Register task instance ti for a sensor in sensor_instance table. Persist the |
| context used for a sensor and set the sensor_instance table state to sensing. |
| |
| :param ti: The task instance for the sensor to be registered. |
| :type: ti: |
| :param poke_context: Context used for sensor poke function. |
| :type poke_context: dict |
| :param execution_context: Context used for execute sensor such as timeout |
| setting and email configuration. |
| :type execution_context: dict |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| :return: True if the ti was registered successfully. |
| :rtype: Boolean |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. py:class:: SkipMixin |
| |
| Bases: :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| A Mixin to skip Tasks Instances |
| |
| |
| .. method:: _set_state_to_skipped(self, dag_run, execution_date, tasks, session) |
| |
| Used internally to set state of task instances to skipped from the same dag run. |
| |
| |
| |
| |
| .. method:: skip(self, dag_run, execution_date, tasks, session=None) |
| |
| Sets tasks instances to skipped from the same dag run. |
| |
| If this instance has a `task_id` attribute, store the list of skipped task IDs to XCom |
| so that NotPreviouslySkippedDep knows these tasks should be skipped when they |
| are cleared. |
| |
| :param dag_run: the DagRun for which to set the tasks to skipped |
| :param execution_date: execution_date |
| :param tasks: tasks to skip (not task_ids) |
| :param session: db session to use |
| |
| |
| |
| |
| .. method:: skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable[str]]) |
| |
| This method implements the logic for a branching operator; given a single |
| task ID or list of task IDs to follow, this skips all other tasks |
| immediately downstream of this operator. |
| |
| branch_task_ids is stored to XCom so that NotPreviouslySkippedDep knows skipped tasks or |
| newly added tasks should be skipped when they are cleared. |
| |
| |
| |
| |
| .. py:class:: SlaMiss |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| Model that stores a history of the SLA that have been missed. |
| It is used to keep track of SLA failures over time and to avoid double |
| triggering alert emails. |
| |
| .. attribute:: __tablename__ |
| :annotation: = sla_miss |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: email_sent |
| |
| |
| |
| |
| .. attribute:: timestamp |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: notification_sent |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. py:class:: TaskFail(task, execution_date, start_date, end_date) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| TaskFail tracks the failed run durations of each task instance. |
| |
| .. attribute:: __tablename__ |
| :annotation: = task_fail |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: start_date |
| |
| |
| |
| |
| .. attribute:: end_date |
| |
| |
| |
| |
| .. attribute:: duration |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| |
| .. py:class:: TaskInstance(task, execution_date: datetime, state: Optional[str] = None) |
| |
| Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Task instances store the state of a task instance. This table is the |
| authority and single source of truth around what tasks have run and the |
| state they are in. |
| |
| The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or |
| dag model deliberately to have more control over transactions. |
| |
| Database transactions on this table should insure double triggers and |
| any confusion around what task instances are or aren't ready to run |
| even while multiple schedulers may be firing task instances. |
| |
| .. attribute:: __tablename__ |
| :annotation: = task_instance |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: start_date |
| |
| |
| |
| |
| .. attribute:: end_date |
| |
| |
| |
| |
| .. attribute:: duration |
| |
| |
| |
| |
| .. attribute:: state |
| |
| |
| |
| |
| .. attribute:: _try_number |
| |
| |
| |
| |
| .. attribute:: max_tries |
| |
| |
| |
| |
| .. attribute:: hostname |
| |
| |
| |
| |
| .. attribute:: unixname |
| |
| |
| |
| |
| .. attribute:: job_id |
| |
| |
| |
| |
| .. attribute:: pool |
| |
| |
| |
| |
| .. attribute:: pool_slots |
| |
| |
| |
| |
| .. attribute:: queue |
| |
| |
| |
| |
| .. attribute:: priority_weight |
| |
| |
| |
| |
| .. attribute:: operator |
| |
| |
| |
| |
| .. attribute:: queued_dttm |
| |
| |
| |
| |
| .. attribute:: queued_by_job_id |
| |
| |
| |
| |
| .. attribute:: pid |
| |
| |
| |
| |
| .. attribute:: executor_config |
| |
| |
| |
| |
| .. attribute:: external_executor_id |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| .. attribute:: dag_model |
| |
| |
| |
| |
| .. attribute:: try_number |
| |
| |
| Return the try number that this task number will be when it is actually |
| run. |
| |
| If the TaskInstance is currently running, this will match the column in the |
| database, in all other cases this will be incremented. |
| |
| |
| .. attribute:: prev_attempted_tries |
| |
| |
| Based on this instance's try_number, this will calculate |
| the number of previously attempted tries, defaulting to 0. |
| |
| |
| .. attribute:: next_try_number |
| |
| |
| Setting Next Try Number |
| |
| |
| .. attribute:: log_filepath |
| |
| |
| Filepath for TaskInstance |
| |
| |
| .. attribute:: log_url |
| |
| |
| Log URL for TaskInstance |
| |
| |
| .. attribute:: mark_success_url |
| |
| |
| URL to mark TI success |
| |
| |
| .. attribute:: key |
| |
| |
| Returns a tuple that identifies the task instance uniquely |
| |
| |
| .. attribute:: is_premature |
| |
| |
| Returns whether a task is in UP_FOR_RETRY state and its retry interval |
| has elapsed. |
| |
| |
| .. attribute:: previous_ti |
| |
| |
| This attribute is deprecated. |
| Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. |
| |
| |
| .. attribute:: previous_ti_success |
| |
| |
| This attribute is deprecated. |
| Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method. |
| |
| |
| .. attribute:: previous_start_date_success |
| |
| |
| This attribute is deprecated. |
| Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method. |
| |
| |
| |
| .. method:: init_on_load(self) |
| |
| Initialize the attributes that aren't stored in the DB |
| |
| |
| |
| |
| .. method:: command_as_list(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None) |
| |
| Returns a command that can be executed anywhere where airflow is |
| installed. This command is part of the message sent to executors by |
| the orchestrator. |
| |
| |
| |
| |
| .. staticmethod:: generate_command(dag_id: str, task_id: str, execution_date: datetime, mark_success: bool = False, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, pickle_id: Optional[int] = None, file_path: Optional[str] = None, raw: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, cfg_path: Optional[str] = None) |
| |
| Generates the shell command required to execute this task instance. |
| |
| :param dag_id: DAG ID |
| :type dag_id: str |
| :param task_id: Task ID |
| :type task_id: str |
| :param execution_date: Execution date for the task |
| :type execution_date: datetime |
| :param mark_success: Whether to mark the task as successful |
| :type mark_success: bool |
| :param ignore_all_deps: Ignore all ignorable dependencies. |
| Overrides the other ignore_* parameters. |
| :type ignore_all_deps: bool |
| :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs |
| (e.g. for Backfills) |
| :type ignore_depends_on_past: bool |
| :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past |
| and trigger rule |
| :type ignore_task_deps: bool |
| :param ignore_ti_state: Ignore the task instance's previous failure/success |
| :type ignore_ti_state: bool |
| :param local: Whether to run the task locally |
| :type local: bool |
| :param pickle_id: If the DAG was serialized to the DB, the ID |
| associated with the pickled DAG |
| :type pickle_id: Optional[int] |
| :param file_path: path to the file containing the DAG definition |
| :type file_path: Optional[str] |
| :param raw: raw mode (needs more details) |
| :type raw: Optional[bool] |
| :param job_id: job ID (needs more details) |
| :type job_id: Optional[int] |
| :param pool: the Airflow pool that the task should run in |
| :type pool: Optional[str] |
| :param cfg_path: the Path to the configuration file |
| :type cfg_path: Optional[str] |
| :return: shell command that can be used to run the task instance |
| :rtype: list[str] |
| |
| |
| |
| |
| .. method:: current_state(self, session=None) |
| |
| Get the very latest state from the database, if a session is passed, |
| we use and looking up the state becomes part of the session, otherwise |
| a new session is used. |
| |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: error(self, session=None) |
| |
| Forces the task instance's state to FAILED in the database. |
| |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: refresh_from_db(self, session=None, lock_for_update=False) |
| |
| Refreshes the task instance from the database based on the primary key |
| |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| :param lock_for_update: if True, indicates that the database should |
| lock the TaskInstance (issuing a FOR UPDATE clause) until the |
| session is committed. |
| :type lock_for_update: bool |
| |
| |
| |
| |
| .. method:: refresh_from_task(self, task, pool_override=None) |
| |
| Copy common attributes from the given task. |
| |
| :param task: The task object to copy from |
| :type task: airflow.models.BaseOperator |
| :param pool_override: Use the pool_override instead of task's pool |
| :type pool_override: str |
| |
| |
| |
| |
| .. method:: clear_xcom_data(self, session=None) |
| |
| Clears all XCom data from the database for the task instance |
| |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: set_state(self, state: str, session=None) |
| |
| Set TaskInstance state. |
| |
| :param state: State to set for the TI |
| :type state: str |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: are_dependents_done(self, session=None) |
| |
| Checks whether the immediate dependents of this task instance have succeeded or have been skipped. |
| This is meant to be used by wait_for_downstream. |
| |
| This is useful when you do not want to start processing the next |
| schedule of a task until the dependents are done. For instance, |
| if the task DROPs and recreates a table. |
| |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: get_previous_ti(self, state: Optional[str] = None, session: Session = None) |
| |
| The task instance for the task that ran before this task instance. |
| |
| :param state: If passed, it only take into account instances of a specific state. |
| :param session: SQLAlchemy ORM Session |
| |
| |
| |
| |
| .. method:: get_previous_execution_date(self, state: Optional[str] = None, session: Session = None) |
| |
| The execution date from property previous_ti_success. |
| |
| :param state: If passed, it only take into account instances of a specific state. |
| :param session: SQLAlchemy ORM Session |
| |
| |
| |
| |
| .. method:: get_previous_start_date(self, state: Optional[str] = None, session: Session = None) |
| |
| The start date from property previous_ti_success. |
| |
| :param state: If passed, it only take into account instances of a specific state. |
| :param session: SQLAlchemy ORM Session |
| |
| |
| |
| |
| .. method:: are_dependencies_met(self, dep_context=None, session=None, verbose=False) |
| |
| Returns whether or not all the conditions are met for this task instance to be run |
| given the context for the dependencies (e.g. a task instance being force run from |
| the UI will ignore some dependencies). |
| |
| :param dep_context: The execution context that determines the dependencies that |
| should be evaluated. |
| :type dep_context: DepContext |
| :param session: database session |
| :type session: sqlalchemy.orm.session.Session |
| :param verbose: whether log details on failed dependencies on |
| info or debug log level |
| :type verbose: bool |
| |
| |
| |
| |
| .. method:: get_failed_dep_statuses(self, dep_context=None, session=None) |
| |
| Get failed Dependencies |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: next_retry_datetime(self) |
| |
| Get datetime of the next retry if the task instance fails. For exponential |
| backoff, retry_delay is used as base and will be converted to seconds. |
| |
| |
| |
| |
| .. method:: ready_for_retry(self) |
| |
| Checks on whether the task instance is in the right state and timeframe |
| to be retried. |
| |
| |
| |
| |
| .. method:: get_dagrun(self, session: Session = None) |
| |
| Returns the DagRun for this TaskInstance |
| |
| :param session: SQLAlchemy ORM Session |
| :return: DagRun |
| |
| |
| |
| |
| .. method:: check_and_change_state_before_execution(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None) |
| |
| Checks dependencies and then sets state to RUNNING if they are met. Returns |
| True if and only if state is set to RUNNING, which implies that task should be |
| executed, in preparation for _run_raw_task |
| |
| :param verbose: whether to turn on more verbose logging |
| :type verbose: bool |
| :param ignore_all_deps: Ignore all of the non-critical dependencies, just runs |
| :type ignore_all_deps: bool |
| :param ignore_depends_on_past: Ignore depends_on_past DAG attribute |
| :type ignore_depends_on_past: bool |
| :param ignore_task_deps: Don't check the dependencies of this TaskInstance's task |
| :type ignore_task_deps: bool |
| :param ignore_ti_state: Disregards previous task instance state |
| :type ignore_ti_state: bool |
| :param mark_success: Don't run the task, mark its state as success |
| :type mark_success: bool |
| :param test_mode: Doesn't record success or failure in the DB |
| :type test_mode: bool |
| :param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID |
| :type job_id: str |
| :param pool: specifies the pool to use to run the task instance |
| :type pool: str |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| :return: whether the state was changed to running or not |
| :rtype: bool |
| |
| |
| |
| |
| .. method:: _date_or_empty(self, attr) |
| |
| |
| |
| |
| .. method:: _run_raw_task(self, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, error_file: Optional[str] = None, session=None) |
| |
| Immediately runs the task (without checking or changing db state |
| before execution) and then sets the appropriate final state after |
| completion and runs any post-execute callbacks. Meant to be called |
| only after another function changes the state to running. |
| |
| :param mark_success: Don't run the task, mark its state as success |
| :type mark_success: bool |
| :param test_mode: Doesn't record success or failure in the DB |
| :type test_mode: bool |
| :param pool: specifies the pool to use to run the task instance |
| :type pool: str |
| :param session: SQLAlchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: _prepare_and_execute_task_with_callbacks(self, context, task) |
| |
| Prepare Task for Execution |
| |
| |
| |
| |
| .. method:: _update_ti_state_for_sensing(self, session=None) |
| |
| |
| |
| |
| .. method:: _execute_task(self, context, task_copy) |
| |
| Executes Task (optionally with a Timeout) and pushes Xcom results |
| |
| |
| |
| |
| .. method:: _run_execute_callback(self, context: Context, task) |
| |
| Functions that need to be run before a Task is executed |
| |
| |
| |
| |
| .. method:: _run_finished_callback(self, error: Optional[Union[str, Exception]] = None) |
| |
| Call callback defined for finished state change. |
| |
| NOTE: Only invoke this function from caller of self._run_raw_task or |
| self.run |
| |
| |
| |
| |
| .. method:: run(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None) |
| |
| Run TaskInstance |
| |
| |
| |
| |
| .. method:: dry_run(self) |
| |
| Only Renders Templates for the TI |
| |
| |
| |
| |
| .. method:: _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=False, session=None) |
| |
| |
| |
| |
| .. method:: handle_failure(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, error_file: Optional[str] = None, session=None) |
| |
| Handle Failure for the TaskInstance |
| |
| |
| |
| |
| .. method:: handle_failure_with_callback(self, error: Union[str, Exception], test_mode: Optional[bool] = None, force_fail: bool = False, session=None) |
| |
| |
| |
| |
| .. method:: is_eligible_to_retry(self) |
| |
| Is task instance is eligible for retry |
| |
| |
| |
| |
| .. method:: _safe_date(self, date_attr, fmt) |
| |
| |
| |
| |
| .. method:: get_template_context(self, session=None) |
| |
| Return TI Context |
| |
| |
| |
| |
| .. method:: get_rendered_template_fields(self) |
| |
| Fetch rendered template fields from DB |
| |
| |
| |
| |
| .. method:: get_rendered_k8s_spec(self) |
| |
| Fetch rendered template fields from DB |
| |
| |
| |
| |
| .. method:: overwrite_params_with_dag_run_conf(self, params, dag_run) |
| |
| Overwrite Task Params with DagRun.conf |
| |
| |
| |
| |
| .. method:: render_templates(self, context: Optional[Context] = None) |
| |
| Render templates in the operator fields. |
| |
| |
| |
| |
| .. method:: render_k8s_pod_yaml(self) |
| |
| Render k8s pod yaml |
| |
| |
| |
| |
| .. method:: get_email_subject_content(self, exception) |
| |
| Get the email subject content for exceptions. |
| |
| |
| |
| |
| .. method:: email_alert(self, exception) |
| |
| Send alert email with exception information. |
| |
| |
| |
| |
| .. method:: set_duration(self) |
| |
| Set TI duration |
| |
| |
| |
| |
| .. method:: xcom_push(self, key: str, value: Any, execution_date: Optional[datetime] = None, session: Session = None) |
| |
| Make an XCom available for tasks to pull. |
| |
| :param key: A key for the XCom |
| :type key: str |
| :param value: A value for the XCom. The value is pickled and stored |
| in the database. |
| :type value: any picklable object |
| :param execution_date: if provided, the XCom will not be visible until |
| this date. This can be used, for example, to send a message to a |
| task on a future date without it being immediately visible. |
| :type execution_date: datetime |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: xcom_pull(self, task_ids: Optional[Union[str, Iterable[str]]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: bool = False, session: Session = None) |
| |
| Pull XComs that optionally meet certain criteria. |
| |
| The default value for `key` limits the search to XComs |
| that were returned by other tasks (as opposed to those that were pushed |
| manually). To remove this filter, pass key=None (or any desired value). |
| |
| If a single task_id string is provided, the result is the value of the |
| most recent matching XCom from that task_id. If multiple task_ids are |
| provided, a tuple of matching values is returned. None is returned |
| whenever no matches are found. |
| |
| :param key: A key for the XCom. If provided, only XComs with matching |
| keys will be returned. The default key is 'return_value', also |
| available as a constant XCOM_RETURN_KEY. This key is automatically |
| given to XComs returned by tasks (as opposed to being pushed |
| manually). To remove the filter, pass key=None. |
| :type key: str |
| :param task_ids: Only XComs from tasks with matching ids will be |
| pulled. Can pass None to remove the filter. |
| :type task_ids: str or iterable of strings (representing task_ids) |
| :param dag_id: If provided, only pulls XComs from this DAG. |
| If None (default), the DAG of the calling task is used. |
| :type dag_id: str |
| :param include_prior_dates: If False, only XComs from the current |
| execution_date are returned. If True, XComs from previous dates |
| are returned as well. |
| :type include_prior_dates: bool |
| :param session: Sqlalchemy ORM Session |
| :type session: Session |
| |
| |
| |
| |
| .. method:: get_num_running_task_instances(self, session) |
| |
| Return Number of running TIs from the DB |
| |
| |
| |
| |
| .. method:: init_run_context(self, raw=False) |
| |
| Sets the log context. |
| |
| |
| |
| |
| .. staticmethod:: filter_for_tis(tis: Iterable[Union['TaskInstance', TaskInstanceKey]]) |
| |
| Returns SQLAlchemy filter to query selected task instances |
| |
| |
| |
| |
| .. function:: clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state: Union[DagRunState, Literal[False]] = DagRunState.QUEUED) |
| Clears a set of task instances, but makes sure the running ones |
| get killed. |
| |
| :param tis: a list of task instances |
| :param session: current session |
| :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not |
| be changed. |
| :param dag: DAG object |
| :param activate_dag_runs: Deprecated parameter, do not pass |
| |
| |
| .. py:class:: TaskReschedule(task, execution_date, try_number, start_date, end_date, reschedule_date) |
| |
| Bases: :class:`airflow.models.base.Base` |
| |
| TaskReschedule tracks rescheduled task instances. |
| |
| .. attribute:: __tablename__ |
| :annotation: = task_reschedule |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: task_id |
| |
| |
| |
| |
| .. attribute:: dag_id |
| |
| |
| |
| |
| .. attribute:: execution_date |
| |
| |
| |
| |
| .. attribute:: try_number |
| |
| |
| |
| |
| .. attribute:: start_date |
| |
| |
| |
| |
| .. attribute:: end_date |
| |
| |
| |
| |
| .. attribute:: duration |
| |
| |
| |
| |
| .. attribute:: reschedule_date |
| |
| |
| |
| |
| .. attribute:: __table_args__ |
| |
| |
| |
| |
| |
| .. staticmethod:: query_for_task_instance(task_instance, descending=False, session=None) |
| |
| Returns query for task reschedules for a given the task instance. |
| |
| :param session: the database session object |
| :type session: sqlalchemy.orm.session.Session |
| :param task_instance: the task instance to find task reschedules for |
| :type task_instance: airflow.models.TaskInstance |
| :param descending: If True then records are returned in descending order |
| :type descending: bool |
| |
| |
| |
| |
| .. staticmethod:: find_for_task_instance(task_instance, session=None) |
| |
| Returns all task reschedules for the task instance and try number, |
| in ascending order. |
| |
| :param session: the database session object |
| :type session: sqlalchemy.orm.session.Session |
| :param task_instance: the task instance to find task reschedules for |
| :type task_instance: airflow.models.TaskInstance |
| |
| |
| |
| |
| .. py:class:: Variable(key=None, val=None, description=None) |
| |
| Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Variables are a generic way to store and retrieve arbitrary content or settings |
| as a simple key value store within Airflow. |
| |
| .. attribute:: __tablename__ |
| :annotation: = variable |
| |
| |
| |
| .. attribute:: __NO_DEFAULT_SENTINEL |
| |
| |
| |
| |
| .. attribute:: id |
| |
| |
| |
| |
| .. attribute:: key |
| |
| |
| |
| |
| .. attribute:: _val |
| |
| |
| |
| |
| .. attribute:: description |
| |
| |
| |
| |
| .. attribute:: is_encrypted |
| |
| |
| |
| |
| .. attribute:: val |
| |
| |
| Get Airflow Variable from Metadata DB and decode it using the Fernet Key |
| |
| |
| |
| .. method:: on_db_load(self) |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| .. method:: get_val(self) |
| |
| Get Airflow Variable from Metadata DB and decode it using the Fernet Key |
| |
| |
| |
| |
| .. method:: set_val(self, value) |
| |
| Encode the specified value with Fernet Key and store it in Variables Table. |
| |
| |
| |
| |
| .. classmethod:: setdefault(cls, key, default, deserialize_json=False) |
| |
| Like a Python builtin dict object, setdefault returns the current value |
| for a key, and if it isn't there, stores the default value and returns it. |
| |
| :param key: Dict key for this Variable |
| :type key: str |
| :param default: Default value to set and return if the variable |
| isn't already in the DB |
| :type default: Mixed |
| :param deserialize_json: Store this as a JSON encoded value in the DB |
| and un-encode it when retrieving a value |
| :return: Mixed |
| |
| |
| |
| |
| .. classmethod:: get(cls, key: str, default_var: Any = __NO_DEFAULT_SENTINEL, deserialize_json: bool = False) |
| |
| Gets a value for an Airflow Variable Key |
| |
| :param key: Variable Key |
| :param default_var: Default value of the Variable if the Variable doesn't exists |
| :param deserialize_json: Deserialize the value to a Python dict |
| |
| |
| |
| |
| .. classmethod:: set(cls, key: str, value: Any, serialize_json: bool = False, session: Session = None) |
| |
| Sets a value for an Airflow Variable with a given Key |
| |
| :param key: Variable Key |
| :param value: Value to set for the Variable |
| :param serialize_json: Serialize the value to a JSON string |
| :param session: SQL Alchemy Sessions |
| |
| |
| |
| |
| .. classmethod:: delete(cls, key: str, session: Session = None) |
| |
| Delete an Airflow Variable for a given key |
| |
| :param key: Variable Key |
| :param session: SQL Alchemy Sessions |
| |
| |
| |
| |
| .. method:: rotate_fernet_key(self) |
| |
| Rotate Fernet Key |
| |
| |
| |
| |
| .. staticmethod:: get_variable_from_secrets(key: str) |
| |
| Get Airflow Variable by iterating over all Secret Backends. |
| |
| :param key: Variable Key |
| :return: Variable Value |
| |
| |
| |
| |
| .. data:: XCOM_RETURN_KEY |
| :annotation: = return_value |
| |
| |
| |
| .. data:: XCom |
| |
| |
| |
| |