blob: d648cb3c078ead8a2abfad50db870622a2cffea8 [file] [log] [blame]
:mod:`airflow.models.taskinstance`
==================================
.. py:module:: airflow.models.taskinstance
Module Contents
---------------
.. function:: clear_task_instances(tis, session, activate_dag_runs=True, dag=None)
Clears a set of task instances, but makes sure the running ones
get killed.
:param tis: a list of task instances
:param session: current session
:param activate_dag_runs: flag to check for active dag run
:param dag: DAG object
.. py:class:: TaskInstance(task, execution_date, state=None)
Bases: :class:`airflow.models.base.Base`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`
Task instances store the state of a task instance. This table is the
authority and single source of truth around what tasks have run and the
state they are in.
The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or
dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and
any confusion around what task instances are or aren't ready to run
even while multiple schedulers may be firing task instances.
.. attribute:: __tablename__
:annotation: = task_instance
.. attribute:: task_id
.. attribute:: dag_id
.. attribute:: execution_date
.. attribute:: start_date
.. attribute:: end_date
.. attribute:: duration
.. attribute:: state
.. attribute:: _try_number
.. attribute:: max_tries
.. attribute:: hostname
.. attribute:: unixname
.. attribute:: job_id
.. attribute:: pool
.. attribute:: pool_slots
.. attribute:: queue
.. attribute:: priority_weight
.. attribute:: operator
.. attribute:: queued_dttm
.. attribute:: pid
.. attribute:: executor_config
.. attribute:: __table_args__
.. attribute:: try_number
Return the try number that this task number will be when it is actually
run.
If the TI is currently running, this will match the column in the
database, in all other cases this will be incremented.
.. attribute:: prev_attempted_tries
Based on this instance's try_number, this will calculate
the number of previously attempted tries, defaulting to 0.
.. attribute:: next_try_number
.. attribute:: log_filepath
.. attribute:: log_url
.. attribute:: mark_success_url
.. attribute:: key
Returns a tuple that identifies the task instance uniquely
.. attribute:: is_premature
Returns whether a task is in UP_FOR_RETRY state and its retry interval
has elapsed.
.. attribute:: previous_ti
The task instance for the task that ran before this task instance.
.. attribute:: previous_ti_success
The ti from prior succesful dag run for this task, by execution date.
.. attribute:: previous_execution_date_success
The execution date from property previous_ti_success.
.. attribute:: previous_start_date_success
The start date from property previous_ti_success.
.. method:: init_on_load(self)
Initialize the attributes that aren't stored in the DB.
.. method:: command(self, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
.. method:: command_as_list(self, mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
.. staticmethod:: generate_command(dag_id, task_id, execution_date, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None)
Generates the shell command required to execute this task instance.
:param dag_id: DAG ID
:type dag_id: unicode
:param task_id: Task ID
:type task_id: unicode
:param execution_date: Execution date for the task
:type execution_date: datetime.datetime
:param mark_success: Whether to mark the task as successful
:type mark_success: bool
:param ignore_all_deps: Ignore all ignorable dependencies.
Overrides the other ignore_* parameters.
:type ignore_all_deps: bool
:param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs
(e.g. for Backfills)
:type ignore_depends_on_past: bool
:param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past
and trigger rule
:type ignore_task_deps: bool
:param ignore_ti_state: Ignore the task instance's previous failure/success
:type ignore_ti_state: bool
:param local: Whether to run the task locally
:type local: bool
:param pickle_id: If the DAG was serialized to the DB, the ID
associated with the pickled DAG
:type pickle_id: unicode
:param file_path: path to the file containing the DAG definition
:param raw: raw mode (needs more details)
:param job_id: job ID (needs more details)
:param pool: the Airflow pool that the task should run in
:type pool: unicode
:param cfg_path: the Path to the configuration file
:type cfg_path: basestring
:return: shell command that can be used to run the task instance
.. method:: current_state(self, session=None)
Get the very latest state from the database, if a session is passed,
we use and looking up the state becomes part of the session, otherwise
a new session is used.
.. method:: error(self, session=None)
Forces the task instance's state to FAILED in the database.
.. method:: refresh_from_db(self, session=None, lock_for_update=False)
Refreshes the task instance from the database based on the primary key
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the
session is committed.
.. method:: refresh_from_task(self, task, pool_override=None)
Copy common attributes from the given task.
:param task: The task object to copy from
:type task: airflow.models.BaseOperator
:param pool_override: Use the pool_override instead of task's pool
:type pool_override: str
.. method:: clear_xcom_data(self, session=None)
Clears all XCom data from the database for the task instance
.. method:: set_state(self, state, session=None, commit=True)
.. method:: are_dependents_done(self, session=None)
Checks whether the dependents of this task instance have all succeeded.
This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next
schedule of a task until the dependents are done. For instance,
if the task DROPs and recreates a table.
.. method:: _get_previous_ti(self, state=None, session=None)
.. method:: are_dependencies_met(self, dep_context=None, session=None, verbose=False)
Returns whether or not all the conditions are met for this task instance to be run
given the context for the dependencies (e.g. a task instance being force run from
the UI will ignore some dependencies).
:param dep_context: The execution context that determines the dependencies that
should be evaluated.
:type dep_context: DepContext
:param session: database session
:type session: sqlalchemy.orm.session.Session
:param verbose: whether log details on failed dependencies on
info or debug log level
:type verbose: bool
.. method:: get_failed_dep_statuses(self, dep_context=None, session=None)
.. method:: __repr__(self)
.. method:: next_retry_datetime(self)
Get datetime of the next retry if the task instance fails. For exponential
backoff, retry_delay is used as base and will be converted to seconds.
.. method:: ready_for_retry(self)
Checks on whether the task instance is in the right state and timeframe
to be retried.
.. method:: pool_full(self, session)
Returns a boolean as to whether the slot pool has room for this
task to run
.. method:: get_dagrun(self, session)
Returns the DagRun for this TaskInstance
:param session:
:return: DagRun
.. method:: _check_and_change_state_before_execution(self, verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=None)
Checks dependencies and then sets state to RUNNING if they are met. Returns
True if and only if state is set to RUNNING, which implies that task should be
executed, in preparation for _run_raw_task
:param verbose: whether to turn on more verbose logging
:type verbose: bool
:param ignore_all_deps: Ignore all of the non-critical dependencies, just runs
:type ignore_all_deps: bool
:param ignore_depends_on_past: Ignore depends_on_past DAG attribute
:type ignore_depends_on_past: bool
:param ignore_task_deps: Don't check the dependencies of this TI's task
:type ignore_task_deps: bool
:param ignore_ti_state: Disregards previous task instance state
:type ignore_ti_state: bool
:param mark_success: Don't run the task, mark its state as success
:type mark_success: bool
:param test_mode: Doesn't record success or failure in the DB
:type test_mode: bool
:param pool: specifies the pool to use to run the task instance
:type pool: str
:return: whether the state was changed to running or not
:rtype: bool
.. method:: _run_raw_task(self, mark_success=False, test_mode=False, job_id=None, pool=None, session=None)
Immediately runs the task (without checking or changing db state
before execution) and then sets the appropriate final state after
completion and runs any post-execute callbacks. Meant to be called
only after another function changes the state to running.
:param mark_success: Don't run the task, mark its state as success
:type mark_success: bool
:param test_mode: Doesn't record success or failure in the DB
:type test_mode: bool
:param pool: specifies the pool to use to run the task instance
:type pool: str
.. method:: run(self, verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=None)
.. method:: dry_run(self)
.. method:: _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=False, context=None, session=None)
.. method:: handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None)
.. method:: is_eligible_to_retry(self)
Is task instance is eligible for retry
.. method:: _safe_date(self, date_attr, fmt)
.. method:: get_template_context(self, session=None)
.. method:: get_rendered_template_fields(self)
Fetch rendered template fields from DB if Serialization is enabled.
Else just render the templates
.. method:: overwrite_params_with_dag_run_conf(self, params, dag_run)
.. method:: render_templates(self, context=None)
Render templates in the operator fields.
.. method:: email_alert(self, exception)
.. method:: set_duration(self)
.. method:: xcom_push(self, key, value, execution_date=None)
Make an XCom available for tasks to pull.
:param key: A key for the XCom
:type key: str
:param value: A value for the XCom. The value is pickled and stored
in the database.
:type value: any pickleable object
:param execution_date: if provided, the XCom will not be visible until
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
:type execution_date: datetime
.. method:: xcom_pull(self, task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False)
Pull XComs that optionally meet certain criteria.
The default value for `key` limits the search to XComs
that were returned by other tasks (as opposed to those that were pushed
manually). To remove this filter, pass key=None (or any desired value).
If a single task_id string is provided, the result is the value of the
most recent matching XCom from that task_id. If multiple task_ids are
provided, a tuple of matching values is returned. None is returned
whenever no matches are found.
:param key: A key for the XCom. If provided, only XComs with matching
keys will be returned. The default key is 'return_value', also
available as a constant XCOM_RETURN_KEY. This key is automatically
given to XComs returned by tasks (as opposed to being pushed
manually). To remove the filter, pass key=None.
:type key: str
:param task_ids: Only XComs from tasks with matching ids will be
pulled. Can pass None to remove the filter.
:type task_ids: str or iterable of strings (representing task_ids)
:param dag_id: If provided, only pulls XComs from this DAG.
If None (default), the DAG of the calling task is used.
:type dag_id: str
:param include_prior_dates: If False, only XComs from the current
execution_date are returned. If True, XComs from previous dates
are returned as well.
:type include_prior_dates: bool
.. method:: get_num_running_task_instances(self, session)
.. method:: init_run_context(self, raw=False)
Sets the log context.