blob: 372102094f0c127092d57b0821b30d0ab416256e [file] [log] [blame]
:mod:`airflow.sensors.smart_sensor`
===================================
.. py:module:: airflow.sensors.smart_sensor
Module Contents
---------------
.. data:: config
.. data:: handler_config
.. data:: formatter_config
.. data:: dictConfigurator
.. py:class:: SensorWork(si)
This class stores a sensor work with decoded context value. It is only used
inside of smart sensor. Create a sensor work based on sensor instance record.
A sensor work object has the following attributes:
`dag_id`: sensor_instance dag_id.
`task_id`: sensor_instance task_id.
`execution_date`: sensor_instance execution_date.
`try_number`: sensor_instance try_number
`poke_context`: Decoded poke_context for the sensor task.
`execution_context`: Decoded execution_context.
`hashcode`: This is the signature of poking job.
`operator`: The sensor operator class.
`op_classpath`: The sensor operator class path
`encoded_poke_context`: The raw data from sensor_instance poke_context column.
`log`: The sensor work logger which will mock the corresponding task instance log.
:param si: The sensor_instance ORM object.
.. attribute:: ti_key
Key for the task instance that maps to the sensor work.
.. attribute:: cache_key
Key used to query in smart sensor for cached sensor work.
.. method:: __eq__(self, other)
.. staticmethod:: create_new_task_handler()
Create task log handler for a sensor work.
:return: log handler
.. method:: _get_sensor_logger(self, si)
Return logger for a sensor instance object.
.. method:: close_sensor_logger(self)
Close log handler for a sensor work.
.. py:class:: CachedPokeWork
Wrapper class for the poke work inside smart sensor. It saves
the sensor_task used to poke and recent poke result state.
state: poke state.
sensor_task: The cached object for executing the poke function.
last_poke_time: The latest time this cached work being called.
to_flush: If we should flush the cached work.
.. method:: set_state(self, state)
Set state for cached poke work.
:param state: The sensor_instance state.
.. method:: clear_state(self)
Clear state for cached poke work.
.. method:: set_to_flush(self)
Mark this poke work to be popped from cached dict after current loop.
.. method:: is_expired(self)
The cached task object expires if there is no poke for 20 minutes.
:return: Boolean
.. py:class:: SensorExceptionInfo(exception_info, is_infra_failure=False, infra_failure_retry_window=datetime.timedelta(minutes=130))
Hold sensor exception information and the type of exception. For possible transient
infra failure, give the task more chance to retry before fail it.
.. attribute:: exception_info
:return: exception msg.
.. attribute:: is_infra_failure
:return: If the exception is an infra failure
:type: boolean
.. method:: set_latest_exception(self, exception_info, is_infra_failure=False)
This function set the latest exception information for sensor exception. If the exception
implies an infra failure, this function will check the recorded infra failure timeout
which was set at the first infra failure exception arrives. There is a 6 hours window
for retry without failing current run.
:param exception_info: Details of the exception information.
:param is_infra_failure: If current exception was caused by transient infra failure.
There is a retry window _infra_failure_retry_window that the smart sensor will
retry poke function without failing current task run.
.. method:: set_infra_failure_timeout(self)
Set the time point when the sensor should be failed if it kept getting infra
failure.
:return:
.. method:: should_fail_current_run(self)
:return: Should the sensor fail
:type: boolean
.. method:: is_expired(self)
:return: If current exception need to be kept.
:type: boolean
.. py:class:: SmartSensorOperator(poke_interval=180, smart_sensor_timeout=60 * 60 * 24 * 7, soft_fail=False, shard_min=0, shard_max=100000, poke_timeout=6.0, *args, **kwargs)
Bases: :class:`airflow.models.BaseOperator`, :class:`airflow.models.SkipMixin`
Smart sensor operators are derived from this class.
Smart Sensor operators keep refresh a dictionary by visiting DB.
Taking qualified active sensor tasks. Different from sensor operator,
Smart sensor operators poke for all sensor tasks in the dictionary at
a time interval. When a criteria is met or fail by time out, it update
all sensor task state in task_instance table
:param soft_fail: Set to true to mark the task as SKIPPED on failure
:type soft_fail: bool
:param poke_interval: Time in seconds that the job should wait in
between each tries.
:type poke_interval: int
:param smart_sensor_timeout: Time, in seconds before the internal sensor
job times out if poke_timeout is not defined.
:type smart_sensor_timeout: float
:param shard_min: shard code lower bound (inclusive)
:type shard_min: int
:param shard_max: shard code upper bound (exclusive)
:type shard_max: int
:param poke_timeout: Time, in seconds before the task times out and fails.
:type poke_timeout: float
.. attribute:: ui_color
:annotation: = #e6f1f2
.. method:: _validate_input_values(self)
.. method:: _load_sensor_works(self, session=None)
Refresh sensor instances need to be handled by this operator. Create smart sensor
internal object based on the information persisted in the sensor_instance table.
.. method:: _update_ti_hostname(self, sensor_works, session=None)
Update task instance hostname for new sensor works.
:param sensor_works: Smart sensor internal object for a sensor task.
:param session: The sqlalchemy session.
.. method:: _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None)
Mark state for multiple tasks in the task_instance table to a new state if they have
the same signature as the poke_hash.
:param operator: The sensor's operator class name.
:param poke_hash: The hash code generated from sensor's poke context.
:param encoded_poke_context: The raw encoded poke_context.
:param state: Set multiple sensor tasks to this state.
:param session: The sqlalchemy session.
.. method:: _retry_or_fail_task(self, sensor_work, error, session=None)
Change single task state for sensor task. For final state, set the end_date.
Since smart sensor take care all retries in one process. Failed sensor tasks
logically experienced all retries and the try_number should be set to max_tries.
:param sensor_work: The sensor_work with exception.
:type sensor_work: SensorWork
:param error: The error message for this sensor_work.
:type error: str.
:param session: The sqlalchemy session.
.. method:: _check_and_handle_ti_timeout(self, sensor_work)
Check if a sensor task in smart sensor is timeout. Could be either sensor operator timeout
or general operator execution_timeout.
:param sensor_work: SensorWork
.. method:: _handle_poke_exception(self, sensor_work)
Fail task if accumulated exceptions exceeds retries.
:param sensor_work: SensorWork
.. method:: _process_sensor_work_with_cached_state(self, sensor_work, state)
.. method:: _execute_sensor_work(self, sensor_work)
.. method:: flush_cached_sensor_poke_results(self)
Flush outdated cached sensor states saved in previous loop.
.. method:: poke(self, sensor_work)
Function that the sensors defined while deriving this class should
override.
.. method:: _emit_loop_stats(self)
.. method:: execute(self, context)
.. method:: on_kill(self)