blob: 31163284dab1498074b377932893f8bb14886af3 [file] [log] [blame]
:mod:`airflow.models.taskinstance`
==================================
.. py:module:: airflow.models.taskinstance
Module Contents
---------------
.. data:: ApiClient
.. data:: TR
.. data:: Context
.. data:: _CURRENT_CONTEXT
:annotation: :List[Context] = []
.. data:: log
.. function:: set_current_context(context: Context)
Sets the current execution context to the provided context object.
This method should be called once per Task execution, before calling operator.execute.
.. function:: clear_task_instances(tis, session, activate_dag_runs=True, dag=None)
Clears a set of task instances, but makes sure the running ones
get killed.
:param tis: a list of task instances
:param session: current session
:param activate_dag_runs: flag to check for active dag run
:param dag: DAG object
.. py:class:: TaskInstanceKey
Bases: :class:`typing.NamedTuple`
Key used to identify task instance.
.. attribute:: dag_id
:annotation: :str
.. attribute:: task_id
:annotation: :str
.. attribute:: execution_date
:annotation: :datetime
.. attribute:: try_number
:annotation: :int
.. attribute:: primary
Return task instance primary key part of the key
.. attribute:: reduced
Remake the key by subtracting 1 from try number to match in memory information
.. method:: with_try_number(self, try_number: int)
Returns TaskInstanceKey with provided ``try_number``
.. py:class:: TaskInstance(task, execution_date: datetime, state: Optional[str] = None)
Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`
Task instances store the state of a task instance. This table is the
authority and single source of truth around what tasks have run and the
state they are in.
The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or
dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and
any confusion around what task instances are or aren't ready to run
even while multiple schedulers may be firing task instances.
.. attribute:: __tablename__
:annotation: = task_instance
.. attribute:: task_id
.. attribute:: dag_id
.. attribute:: execution_date
.. attribute:: start_date
.. attribute:: end_date
.. attribute:: duration
.. attribute:: state
.. attribute:: _try_number
.. attribute:: max_tries
.. attribute:: hostname
.. attribute:: unixname
.. attribute:: job_id
.. attribute:: pool
.. attribute:: pool_slots
.. attribute:: queue
.. attribute:: priority_weight
.. attribute:: operator
.. attribute:: queued_dttm
.. attribute:: queued_by_job_id
.. attribute:: pid
.. attribute:: executor_config
.. attribute:: external_executor_id
.. attribute:: __table_args__
.. attribute:: dag_model
.. attribute:: try_number
Return the try number that this task number will be when it is actually
run.
If the TaskInstance is currently running, this will match the column in the
database, in all other cases this will be incremented.
.. attribute:: prev_attempted_tries
Based on this instance's try_number, this will calculate
the number of previously attempted tries, defaulting to 0.
.. attribute:: next_try_number
Setting Next Try Number
.. attribute:: log_filepath
Filepath for TaskInstance
.. attribute:: log_url
Log URL for TaskInstance
.. attribute:: mark_success_url
URL to mark TI success
.. attribute:: key
Returns a tuple that identifies the task instance uniquely
.. attribute:: is_premature
Returns whether a task is in UP_FOR_RETRY state and its retry interval
has elapsed.
.. attribute:: previous_ti
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
.. attribute:: previous_ti_success
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
.. attribute:: previous_start_date_success
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
.. method:: init_on_load(self)
Initialize the attributes that aren't stored in the DB
.. method:: command_as_list(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
.. staticmethod:: generate_command(dag_id: str, task_id: str, execution_date: datetime, mark_success: bool = False, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, pickle_id: Optional[int] = None, file_path: Optional[str] = None, raw: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, cfg_path: Optional[str] = None)
Generates the shell command required to execute this task instance.
:param dag_id: DAG ID
:type dag_id: str
:param task_id: Task ID
:type task_id: str
:param execution_date: Execution date for the task
:type execution_date: datetime
:param mark_success: Whether to mark the task as successful
:type mark_success: bool
:param ignore_all_deps: Ignore all ignorable dependencies.
Overrides the other ignore_* parameters.
:type ignore_all_deps: bool
:param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
(e.g. for Backfills)
:type ignore_depends_on_past: bool
:param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past
and trigger rule
:type ignore_task_deps: bool
:param ignore_ti_state: Ignore the task instance's previous failure/success
:type ignore_ti_state: bool
:param local: Whether to run the task locally
:type local: bool
:param pickle_id: If the DAG was serialized to the DB, the ID
associated with the pickled DAG
:type pickle_id: Optional[int]
:param file_path: path to the file containing the DAG definition
:type file_path: Optional[str]
:param raw: raw mode (needs more details)
:type raw: Optional[bool]
:param job_id: job ID (needs more details)
:type job_id: Optional[int]
:param pool: the Airflow pool that the task should run in
:type pool: Optional[str]
:param cfg_path: the Path to the configuration file
:type cfg_path: Optional[str]
:return: shell command that can be used to run the task instance
:rtype: list[str]
.. method:: current_state(self, session=None)
Get the very latest state from the database, if a session is passed,
we use and looking up the state becomes part of the session, otherwise
a new session is used.
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: error(self, session=None)
Forces the task instance's state to FAILED in the database.
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: refresh_from_db(self, session=None, lock_for_update=False)
Refreshes the task instance from the database based on the primary key
:param session: SQLAlchemy ORM Session
:type session: Session
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
:type lock_for_update: bool
.. method:: refresh_from_task(self, task, pool_override=None)
Copy common attributes from the given task.
:param task: The task object to copy from
:type task: airflow.models.BaseOperator
:param pool_override: Use the pool_override instead of task's pool
:type pool_override: str
.. method:: clear_xcom_data(self, session=None)
Clears all XCom data from the database for the task instance
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: set_state(self, state: str, session=None)
Set TaskInstance state.
:param state: State to set for the TI
:type state: str
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: are_dependents_done(self, session=None)
Checks whether the immediate dependents of this task instance have succeeded or have been skipped.
This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next
schedule of a task until the dependents are done. For instance,
if the task DROPs and recreates a table.
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: get_previous_ti(self, state: Optional[str] = None, session: Session = None)
The task instance for the task that ran before this task instance.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. method:: get_previous_execution_date(self, state: Optional[str] = None, session: Session = None)
The execution date from property previous_ti_success.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. method:: get_previous_start_date(self, state: Optional[str] = None, session: Session = None)
The start date from property previous_ti_success.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
.. method:: are_dependencies_met(self, dep_context=None, session=None, verbose=False)
Returns whether or not all the conditions are met for this task instance to be run
given the context for the dependencies (e.g. a task instance being force run from
the UI will ignore some dependencies).
:param dep_context: The execution context that determines the dependencies that
should be evaluated.
:type dep_context: DepContext
:param session: database session
:type session: sqlalchemy.orm.session.Session
:param verbose: whether log details on failed dependencies on
info or debug log level
:type verbose: bool
.. method:: get_failed_dep_statuses(self, dep_context=None, session=None)
Get failed Dependencies
.. method:: __repr__(self)
.. method:: next_retry_datetime(self)
Get datetime of the next retry if the task instance fails. For exponential
backoff, retry_delay is used as base and will be converted to seconds.
.. method:: ready_for_retry(self)
Checks on whether the task instance is in the right state and timeframe
to be retried.
.. method:: get_dagrun(self, session: Session = None)
Returns the DagRun for this TaskInstance
:param session: SQLAlchemy ORM Session
:return: DagRun
.. method:: check_and_change_state_before_execution(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None)
Checks dependencies and then sets state to RUNNING if they are met. Returns
True if and only if state is set to RUNNING, which implies that task should be
executed, in preparation for _run_raw_task
:param verbose: whether to turn on more verbose logging
:type verbose: bool
:param ignore_all_deps: Ignore all of the non-critical dependencies, just runs
:type ignore_all_deps: bool
:param ignore_depends_on_past: Ignore depends_on_past DAG attribute
:type ignore_depends_on_past: bool
:param ignore_task_deps: Don't check the dependencies of this TaskInstance's task
:type ignore_task_deps: bool
:param ignore_ti_state: Disregards previous task instance state
:type ignore_ti_state: bool
:param mark_success: Don't run the task, mark its state as success
:type mark_success: bool
:param test_mode: Doesn't record success or failure in the DB
:type test_mode: bool
:param job_id: Job (BackfillJob / LocalTaskJob / SchedulerJob) ID
:type job_id: str
:param pool: specifies the pool to use to run the task instance
:type pool: str
:param session: SQLAlchemy ORM Session
:type session: Session
:return: whether the state was changed to running or not
:rtype: bool
.. method:: _date_or_empty(self, attr)
.. method:: _run_raw_task(self, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None)
Immediately runs the task (without checking or changing db state
before execution) and then sets the appropriate final state after
completion and runs any post-execute callbacks. Meant to be called
only after another function changes the state to running.
:param mark_success: Don't run the task, mark its state as success
:type mark_success: bool
:param test_mode: Doesn't record success or failure in the DB
:type test_mode: bool
:param pool: specifies the pool to use to run the task instance
:type pool: str
:param session: SQLAlchemy ORM Session
:type session: Session
.. method:: _run_mini_scheduler_on_child_tasks(self, session=None)
.. method:: _prepare_and_execute_task_with_callbacks(self, context, task)
Prepare Task for Execution
.. method:: _update_ti_state_for_sensing(self, session=None)
.. method:: _run_success_callback(self, context, task)
Functions that need to be run if Task is successful
.. method:: _execute_task(self, context, task_copy)
Executes Task (optionally with a Timeout) and pushes Xcom results
.. method:: _run_execute_callback(self, context, task)
Functions that need to be run before a Task is executed
.. method:: run(self, verbose: bool = True, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, test_mode: bool = False, job_id: Optional[str] = None, pool: Optional[str] = None, session=None)
Run TaskInstance
.. method:: dry_run(self)
Only Renders Templates for the TI
.. method:: _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=False, session=None)
.. method:: handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None)
Handle Failure for the TaskInstance
.. method:: is_eligible_to_retry(self)
Is task instance is eligible for retry
.. method:: _safe_date(self, date_attr, fmt)
.. method:: get_template_context(self, session=None)
Return TI Context
.. method:: get_rendered_template_fields(self)
Fetch rendered template fields from DB
.. method:: get_rendered_k8s_spec(self)
Fetch rendered template fields from DB
.. method:: overwrite_params_with_dag_run_conf(self, params, dag_run)
Overwrite Task Params with DagRun.conf
.. method:: render_templates(self, context: Optional[Context] = None)
Render templates in the operator fields.
.. method:: render_k8s_pod_yaml(self)
Render k8s pod yaml
.. method:: get_email_subject_content(self, exception)
Get the email subject content for exceptions.
.. method:: email_alert(self, exception)
Send alert email with exception information.
.. method:: set_duration(self)
Set TI duration
.. method:: xcom_push(self, key: str, value: Any, execution_date: Optional[datetime] = None, session: Session = None)
Make an XCom available for tasks to pull.
:param key: A key for the XCom
:type key: str
:param value: A value for the XCom. The value is pickled and stored
in the database.
:type value: any picklable object
:param execution_date: if provided, the XCom will not be visible until
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
:type execution_date: datetime
:param session: Sqlalchemy ORM Session
:type session: Session
.. method:: xcom_pull(self, task_ids: Optional[Union[str, Iterable[str]]] = None, dag_id: Optional[str] = None, key: str = XCOM_RETURN_KEY, include_prior_dates: bool = False, session: Session = None)
Pull XComs that optionally meet certain criteria.
The default value for `key` limits the search to XComs
that were returned by other tasks (as opposed to those that were pushed
manually). To remove this filter, pass key=None (or any desired value).
If a single task_id string is provided, the result is the value of the
most recent matching XCom from that task_id. If multiple task_ids are
provided, a tuple of matching values is returned. None is returned
whenever no matches are found.
:param key: A key for the XCom. If provided, only XComs with matching
keys will be returned. The default key is 'return_value', also
available as a constant XCOM_RETURN_KEY. This key is automatically
given to XComs returned by tasks (as opposed to being pushed
manually). To remove the filter, pass key=None.
:type key: str
:param task_ids: Only XComs from tasks with matching ids will be
pulled. Can pass None to remove the filter.
:type task_ids: str or iterable of strings (representing task_ids)
:param dag_id: If provided, only pulls XComs from this DAG.
If None (default), the DAG of the calling task is used.
:type dag_id: str
:param include_prior_dates: If False, only XComs from the current
execution_date are returned. If True, XComs from previous dates
are returned as well.
:type include_prior_dates: bool
:param session: Sqlalchemy ORM Session
:type session: Session
.. method:: get_num_running_task_instances(self, session)
Return Number of running TIs from the DB
.. method:: init_run_context(self, raw=False)
Sets the log context.
.. staticmethod:: filter_for_tis(tis: Iterable[Union['TaskInstance', TaskInstanceKey]])
Returns SQLAlchemy filter to query selected task instances
.. data:: TaskInstanceStateType
.. py:class:: SimpleTaskInstance(ti: TaskInstance)
Simplified Task Instance.
Used to send data between processes via Queues.
.. attribute:: dag_id
.. attribute:: task_id
.. attribute:: execution_date
.. attribute:: start_date
.. attribute:: end_date
.. attribute:: try_number
.. attribute:: state
.. attribute:: pool
.. attribute:: priority_weight
.. attribute:: queue
.. attribute:: key
.. attribute:: executor_config
.. method:: construct_task_instance(self, session=None, lock_for_update=False)
Construct a TaskInstance from the database based on the primary key
:param session: DB session.
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
:return: the task instance constructed
.. data:: STATICA_HACK
:annotation: = True
.. data:: dag_run