blob: fdb3e62fc0feff19f3f3f9f54dd06794c7199b76 [file] [log] [blame]
:py:mod:`airflow.sensors.smart_sensor`
======================================
.. py:module:: airflow.sensors.smart_sensor
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.sensors.smart_sensor.SensorWork
airflow.sensors.smart_sensor.CachedPokeWork
airflow.sensors.smart_sensor.SensorExceptionInfo
airflow.sensors.smart_sensor.SmartSensorOperator
Attributes
~~~~~~~~~~
.. autoapisummary::
airflow.sensors.smart_sensor.config
airflow.sensors.smart_sensor.handler_config
airflow.sensors.smart_sensor.formatter_config
airflow.sensors.smart_sensor.dictConfigurator
.. py:data:: config
.. py:data:: handler_config
.. py:data:: formatter_config
.. py:data:: dictConfigurator
.. py:class:: SensorWork(si)
This class stores a sensor work with decoded context value. It is only used
inside of smart sensor. Create a sensor work based on sensor instance record.
A sensor work object has the following attributes:
`dag_id`: sensor_instance dag_id.
`task_id`: sensor_instance task_id.
`execution_date`: sensor_instance execution_date.
`try_number`: sensor_instance try_number
`poke_context`: Decoded poke_context for the sensor task.
`execution_context`: Decoded execution_context.
`hashcode`: This is the signature of poking job.
`operator`: The sensor operator class.
`op_classpath`: The sensor operator class path
`encoded_poke_context`: The raw data from sensor_instance poke_context column.
`log`: The sensor work logger which will mock the corresponding task instance log.
:param si: The sensor_instance ORM object.
.. py:method:: __eq__(self, other)
Return self==value.
.. py:method:: create_new_task_handler()
:staticmethod:
Create task log handler for a sensor work.
:return: log handler
.. py:method:: close_sensor_logger(self)
Close log handler for a sensor work.
.. py:method:: ti_key(self)
:property:
Key for the task instance that maps to the sensor work.
.. py:method:: cache_key(self)
:property:
Key used to query in smart sensor for cached sensor work.
.. py:class:: CachedPokeWork
Wrapper class for the poke work inside smart sensor. It saves
the sensor_task used to poke and recent poke result state.
state: poke state.
sensor_task: The cached object for executing the poke function.
last_poke_time: The latest time this cached work being called.
to_flush: If we should flush the cached work.
.. py:method:: set_state(self, state)
Set state for cached poke work.
:param state: The sensor_instance state.
.. py:method:: clear_state(self)
Clear state for cached poke work.
.. py:method:: set_to_flush(self)
Mark this poke work to be popped from cached dict after current loop.
.. py:method:: is_expired(self)
The cached task object expires if there is no poke for 20 minutes.
:return: Boolean
.. py:class:: SensorExceptionInfo(exception_info, is_infra_failure=False, infra_failure_retry_window=datetime.timedelta(minutes=130))
Hold sensor exception information and the type of exception. For possible transient
infra failure, give the task more chance to retry before fail it.
.. py:method:: set_latest_exception(self, exception_info, is_infra_failure=False)
This function set the latest exception information for sensor exception. If the exception
implies an infra failure, this function will check the recorded infra failure timeout
which was set at the first infra failure exception arrives. There is a 6 hours window
for retry without failing current run.
:param exception_info: Details of the exception information.
:param is_infra_failure: If current exception was caused by transient infra failure.
There is a retry window _infra_failure_retry_window that the smart sensor will
retry poke function without failing current task run.
.. py:method:: set_infra_failure_timeout(self)
Set the time point when the sensor should be failed if it kept getting infra
failure.
:return:
.. py:method:: should_fail_current_run(self)
:return: Should the sensor fail
:type: boolean
.. py:method:: exception_info(self)
:property:
:return: exception msg.
.. py:method:: is_infra_failure(self)
:property:
:return: If the exception is an infra failure
:type: boolean
.. py:method:: is_expired(self)
:return: If current exception need to be kept.
:type: boolean
.. py:class:: SmartSensorOperator(poke_interval=180, smart_sensor_timeout=60 * 60 * 24 * 7, soft_fail=False, shard_min=0, shard_max=100000, poke_timeout=6.0, *args, **kwargs)
Bases: :py:obj:`airflow.models.BaseOperator`, :py:obj:`airflow.models.SkipMixin`
Smart sensor operators are derived from this class.
Smart Sensor operators keep refresh a dictionary by visiting DB.
Taking qualified active sensor tasks. Different from sensor operator,
Smart sensor operators poke for all sensor tasks in the dictionary at
a time interval. When a criteria is met or fail by time out, it update
all sensor task state in task_instance table
:param soft_fail: Set to true to mark the task as SKIPPED on failure
:type soft_fail: bool
:param poke_interval: Time in seconds that the job should wait in
between each tries.
:type poke_interval: int
:param smart_sensor_timeout: Time, in seconds before the internal sensor
job times out if poke_timeout is not defined.
:type smart_sensor_timeout: float
:param shard_min: shard code lower bound (inclusive)
:type shard_min: int
:param shard_max: shard code upper bound (exclusive)
:type shard_max: int
:param poke_timeout: Time, in seconds before the task times out and fails.
:type poke_timeout: float
.. py:attribute:: ui_color
:annotation: = #e6f1f2
.. py:method:: flush_cached_sensor_poke_results(self)
Flush outdated cached sensor states saved in previous loop.
.. py:method:: poke(self, sensor_work)
Function that the sensors defined while deriving this class should
override.
.. py:method:: execute(self, context)
This is the main method to derive when creating an operator.
Context is the same dictionary used as when rendering jinja templates.
Refer to get_template_context for more context.
.. py:method:: on_kill(self)
Override this method to cleanup subprocesses when a task instance
gets killed. Any use of the threading, subprocess or multiprocessing
module within an operator needs to be cleaned up or it will leave
ghost processes behind.