blob: 3a4b6c7c5e0441700b0cc7125eac7e3dc06ef277 [file] [log] [blame]
:py:mod:`airflow.models.taskinstance`
=====================================
.. py:module:: airflow.models.taskinstance
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.models.taskinstance.TaskInstanceKey
airflow.models.taskinstance.TaskInstance
airflow.models.taskinstance.SimpleTaskInstance
Functions
~~~~~~~~~
.. autoapisummary::
airflow.models.taskinstance.set_current_context
airflow.models.taskinstance.load_error_file
airflow.models.taskinstance.set_error_file
airflow.models.taskinstance.clear_task_instances
Attributes
~~~~~~~~~~
.. autoapisummary::
airflow.models.taskinstance.TR
airflow.models.taskinstance.log
airflow.models.taskinstance.TaskInstanceStateType
airflow.models.taskinstance.queued_by_job
.. py:data:: TR
.. py:data:: log
.. py:function:: set_current_context(context)
Sets the current execution context to the provided context object.
This method should be called once per Task execution, before calling operator.execute.
.. py:function:: load_error_file(fd)
Load and return error from error file
.. py:function:: set_error_file(error_file, error)
Write error into error file by path
.. py:function:: clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state = DagRunState.QUEUED)
Clears a set of task instances, but makes sure the running ones
get killed.
:param tis: a list of task instances
:param session: current session
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dag: DAG object
:param activate_dag_runs: Deprecated parameter, do not pass
.. py:class:: TaskInstanceKey
Bases: :py:obj:`NamedTuple`
Key used to identify task instance.
.. py:attribute:: dag_id
:annotation: :str
.. py:attribute:: task_id
:annotation: :str
.. py:attribute:: run_id
:annotation: :str
.. py:attribute:: try_number
:annotation: :int = 1
.. py:attribute:: map_index
:annotation: :int
.. py:method:: primary(self)
:property:
Return task instance primary key part of the key
.. py:method:: reduced(self)
:property:
Remake the key by subtracting 1 from try number to match in memory information
.. py:method:: with_try_number(self, try_number)
Returns TaskInstanceKey with provided ``try_number``
.. py:method:: key(self)
:property:
For API-compatibly with TaskInstance.
Returns self
.. py:class:: TaskInstance(task, execution_date = None, run_id = None, state = None, map_index = -1)
Bases: :py:obj:`airflow.models.base.Base`, :py:obj:`airflow.utils.log.logging_mixin.LoggingMixin`
Task instances store the state of a task instance. This table is the
authority and single source of truth around what tasks have run and the
state they are in.
The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or
dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and
any confusion around what task instances are or aren't ready to run
even while multiple schedulers may be firing task instances.
A value of -1 in map_index represents any of: a TI without mapped tasks;
a TI with mapped tasks that has yet to be expanded (state=pending);
a TI with mapped tasks that expanded to an empty list (state=skipped).
.. py:attribute:: __tablename__
:annotation: = task_instance
.. py:attribute:: task_id
.. py:attribute:: dag_id
.. py:attribute:: run_id
.. py:attribute:: map_index
.. py:attribute:: start_date
.. py:attribute:: end_date
.. py:attribute:: duration
.. py:attribute:: state
.. py:attribute:: max_tries
.. py:attribute:: hostname
.. py:attribute:: unixname
.. py:attribute:: job_id
.. py:attribute:: pool
.. py:attribute:: pool_slots
.. py:attribute:: queue
.. py:attribute:: priority_weight
.. py:attribute:: operator
.. py:attribute:: queued_dttm
.. py:attribute:: queued_by_job_id
.. py:attribute:: pid
.. py:attribute:: executor_config
.. py:attribute:: external_executor_id
.. py:attribute:: trigger_id
.. py:attribute:: trigger_timeout
.. py:attribute:: next_method
.. py:attribute:: next_kwargs
.. py:attribute:: __table_args__
.. py:attribute:: dag_model
.. py:attribute:: trigger
.. py:attribute:: dag_run
.. py:attribute:: rendered_task_instance_fields
.. py:attribute:: execution_date
.. py:attribute:: task
:annotation: :airflow.models.operator.Operator
.. py:method:: insert_mapping(run_id, task, map_index)
:staticmethod:
:meta private:
.. py:method:: init_on_load(self)
Initialize the attributes that aren't stored in the DB
.. py:method:: try_number(self)
:property:
Return the try number that this task number will be when it is actually
run.
If the TaskInstance is currently running, this will match the column in the
database, in all other cases this will be incremented.
.. py:method:: prev_attempted_tries(self)
:property:
Based on this instance's try_number, this will calculate
the number of previously attempted tries, defaulting to 0.
.. py:method:: next_try_number(self)
:property:
Setting Next Try Number
.. py:method:: command_as_list(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
.. py:method:: generate_command(dag_id, task_id, run_id, mark_success = False, ignore_all_deps = False, ignore_depends_on_past = False, ignore_task_deps = False, ignore_ti_state = False, local = False, pickle_id = None, file_path = None, raw = False, job_id = None, pool = None, cfg_path = None, map_index = -1)
:staticmethod:
Generates the shell command required to execute this task instance.
:param dag_id: DAG ID
:param task_id: Task ID
:param run_id: The run_id of this task's DagRun
:param mark_success: Whether to mark the task as successful
:param ignore_all_deps: Ignore all ignorable dependencies.
Overrides the other ignore_* parameters.
:param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
(e.g. for Backfills)
:param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past
and trigger rule
:param ignore_ti_state: Ignore the task instance's previous failure/success
:param local: Whether to run the task locally
:param pickle_id: If the DAG was serialized to the DB, the ID
associated with the pickled DAG
:param file_path: path to the file containing the DAG definition
:param raw: raw mode (needs more details)
:param job_id: job ID (needs more details)
:param pool: the Airflow pool that the task should run in
:param cfg_path: the Path to the configuration file
:return: shell command that can be used to run the task instance
:rtype: list[str]
.. py:method:: log_url(self)
:property:
Log URL for TaskInstance
.. py:method:: mark_success_url(self)
:property:
URL to mark TI success
.. py:method:: current_state(self, session=NEW_SESSION)
Get the very latest state from the database, if a session is passed,
we use and looking up the state becomes part of the session, otherwise
a new session is used.
:param session: SQLAlchemy ORM Session
.. py:method:: error(self, session=NEW_SESSION)
Forces the task instance's state to FAILED in the database.
:param session: SQLAlchemy ORM Session
.. py:method:: refresh_from_db(self, session=NEW_SESSION, lock_for_update=False)
Refreshes the task instance from the database based on the primary key
:param session: SQLAlchemy ORM Session
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
.. py:method:: refresh_from_task(self, task, pool_override=None)
Copy common attributes from the given task.
:param task: The task object to copy from
:param pool_override: Use the pool_override instead of task's pool
.. py:method:: clear_xcom_data(self, session = NEW_SESSION)
Clear all XCom data from the database for the task instance.
If the task is unmapped, all XComs matching this task ID in the same DAG
run are removed. If the task is mapped, only the one with matching map
index is removed.
:param session: SQLAlchemy ORM Session
.. py:method:: key(self)
:property:
Returns a tuple that identifies the task instance uniquely
.. py:method:: set_state(self, state, session=NEW_SESSION)
Set TaskInstance state.
:param state: State to set for the TI
:param session: SQLAlchemy ORM Session
.. py:method:: is_premature(self)
:property:
Returns whether a task is in UP_FOR_RETRY state and its retry interval
has elapsed.
.. py:method:: are_dependents_done(self, session=NEW_SESSION)
Checks whether the immediate dependents of this task instance have succeeded or have been skipped.
This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next
schedule of a task until the dependents are done. For instance,
if the task DROPs and recreates a table.
:param session: SQLAlchemy ORM Session
.. py:method:: get_previous_dagrun(self, state = None, session = None)
The DagRun that ran before this task instance's DagRun.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session.
.. py:method:: get_previous_ti(self, state = None, session = NEW_SESSION)
The task instance for the task that ran before this task instance.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. py:method:: previous_ti(self)
:property:
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
.. py:method:: previous_ti_success(self)
:property:
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
.. py:method:: get_previous_execution_date(self, state = None, session = NEW_SESSION)
The execution date from property previous_ti_success.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. py:method:: get_previous_start_date(self, state = None, session = NEW_SESSION)
The start date from property previous_ti_success.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. py:method:: previous_start_date_success(self)
:property:
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
.. py:method:: are_dependencies_met(self, dep_context=None, session=NEW_SESSION, verbose=False)
Returns whether or not all the conditions are met for this task instance to be run
given the context for the dependencies (e.g. a task instance being force run from
the UI will ignore some dependencies).
:param dep_context: The execution context that determines the dependencies that
should be evaluated.
:param session: database session
:param verbose: whether log details on failed dependencies on
info or debug log level
.. py:method:: get_failed_dep_statuses(self, dep_context=None, session=NEW_SESSION)
Get failed Dependencies
.. py:method:: __repr__(self)
Return repr(self).
.. py:method:: next_retry_datetime(self)
Get datetime of the next retry if the task instance fails. For exponential
backoff, retry_delay is used as base and will be converted to seconds.
.. py:method:: ready_for_retry(self)
Checks on whether the task instance is in the right state and timeframe
to be retried.
.. py:method:: get_dagrun(self, session = NEW_SESSION)
Returns the DagRun for this TaskInstance
:param session: SQLAlchemy ORM Session
:return: DagRun
.. py:method:: check_and_change_state_before_execution(self, verbose = True, ignore_all_deps = False, ignore_depends_on_past = False, ignore_task_deps = False, ignore_ti_state = False, mark_success = False, test_mode = False, job_id = None, pool = None, external_executor_id = None, session=NEW_SESSION)
Checks dependencies and then sets state to RUNNING if they are met. Returns
True if and only if state is set to RUNNING, which implies that task should be
executed, in preparation for _run_raw_task
:param verbose: whether to turn on more verbose logging
:param ignore_all_deps: Ignore all of the non-critical dependencies, just runs
:param ignore_depends_on_past: Ignore depends_on_past DAG attribute
:param ignore_task_deps: Don't check the dependencies of this TaskInstance's task
:param ignore_ti_state: Disregards previous task instance state
:param mark_success: Don't run the task, mark its state as success
:param test_mode: Doesn't record success or failure in the DB
:param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID
:param pool: specifies the pool to use to run the task instance
:param external_executor_id: The identifier of the celery executor
:param session: SQLAlchemy ORM Session
:return: whether the state was changed to running or not
:rtype: bool
.. py:method:: clear_next_method_args(self)
.. py:method:: run(self, verbose = True, ignore_all_deps = False, ignore_depends_on_past = False, ignore_task_deps = False, ignore_ti_state = False, mark_success = False, test_mode = False, job_id = None, pool = None, session=NEW_SESSION)
Run TaskInstance
.. py:method:: dry_run(self)
Only Renders Templates for the TI
.. py:method:: get_truncated_error_traceback(error, truncate_to)
:staticmethod:
Truncates the traceback of an exception to the first frame called from within a given function
:param error: exception to get traceback from
:param truncate_to: Function to truncate TB to. Must have a ``__code__`` attribute
:meta private:
.. py:method:: handle_failure(self, error = None, test_mode = None, force_fail = False, error_file = None, session = NEW_SESSION)
Handle Failure for the TaskInstance
.. py:method:: handle_failure_with_callback(self, error, test_mode = None, force_fail = False, session=NEW_SESSION)
.. py:method:: is_eligible_to_retry(self)
Is task instance is eligible for retry
.. py:method:: get_template_context(self, session = NEW_SESSION, ignore_param_exceptions = True)
Return TI Context
.. py:method:: get_rendered_template_fields(self, session = NEW_SESSION)
Fetch rendered template fields from DB
.. py:method:: get_rendered_k8s_spec(self, session=NEW_SESSION)
Fetch rendered template fields from DB
.. py:method:: overwrite_params_with_dag_run_conf(self, params, dag_run)
Overwrite Task Params with DagRun.conf
.. py:method:: render_templates(self, context = None)
Render templates in the operator fields.
If the task was originally mapped, this may replace ``self.task`` with
the unmapped, fully rendered BaseOperator. The original ``self.task``
before replacement is returned.
.. py:method:: render_k8s_pod_yaml(self)
Render k8s pod yaml
.. py:method:: get_email_subject_content(self, exception, task = None)
Get the email subject content for exceptions.
.. py:method:: email_alert(self, exception, task)
Send alert email with exception information.
.. py:method:: set_duration(self)
Set TI duration
.. py:method:: xcom_push(self, key, value, execution_date = None, session = NEW_SESSION)
Make an XCom available for tasks to pull.
:param key: Key to store the value under.
:param value: Value to store. What types are possible depends on whether
``enable_xcom_pickling`` is true or not. If so, this can be any
picklable object; only be JSON-serializable may be used otherwise.
:param execution_date: Deprecated parameter that has no effect.
.. py:method:: xcom_pull(self, task_ids = None, dag_id = None, key = XCOM_RETURN_KEY, include_prior_dates = False, session = NEW_SESSION, *, map_indexes = None, default = None)
Pull XComs that optionally meet certain criteria.
:param key: A key for the XCom. If provided, only XComs with matching
keys will be returned. The default key is ``'return_value'``, also
available as constant ``XCOM_RETURN_KEY``. This key is automatically
given to XComs returned by tasks (as opposed to being pushed
manually). To remove the filter, pass *None*.
:param task_ids: Only XComs from tasks with matching ids will be
pulled. Pass *None* to remove the filter.
:param dag_id: If provided, only pulls XComs from this DAG. If *None*
(default), the DAG of the calling task is used.
:param map_indexes: If provided, only pull XComs with matching indexes.
If *None* (default), this is inferred from the task(s) being pulled
(see below for details).
:param include_prior_dates: If False, only XComs from the current
execution_date are returned. If *True*, XComs from previous dates
are returned as well.
When pulling one single task (``task_id`` is *None* or a str) without
specifying ``map_indexes``, the return value is inferred from whether
the specified task is mapped. If not, value from the one single task
instance is returned. If the task to pull is mapped, an iterator (not a
list) yielding XComs from mapped task instances is returned. In either
case, ``default`` (*None* if not specified) is returned if no matching
XComs are found.
When pulling multiple tasks (i.e. either ``task_id`` or ``map_index`` is
a non-str iterable), a list of matching XComs is returned. Elements in
the list is ordered by item ordering in ``task_id`` and ``map_index``.
.. py:method:: get_num_running_task_instances(self, session)
Return Number of running TIs from the DB
.. py:method:: init_run_context(self, raw=False)
Sets the log context.
.. py:method:: filter_for_tis(tis)
:staticmethod:
Returns SQLAlchemy filter to query selected task instances
.. py:method:: ti_selector_condition(cls, vals)
:classmethod:
Build an SQLAlchemy filter for a list where each element can contain
whether a task_id, or a tuple of (task_id,map_index)
:meta private:
.. py:data:: TaskInstanceStateType
.. py:class:: SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor_config, pool, queue, key, run_as_user = None, priority_weight = None)
Simplified Task Instance.
Used to send data between processes via Queues.
.. py:method:: __eq__(self, other)
Return self==value.
.. py:method:: from_ti(cls, ti)
:classmethod:
.. py:method:: from_dict(cls, obj_dict)
:classmethod:
.. py:data:: queued_by_job