| :py:mod:`airflow.sensors.smart_sensor` |
| ====================================== |
| |
| .. py:module:: airflow.sensors.smart_sensor |
| |
| |
| Module Contents |
| --------------- |
| |
| Classes |
| ~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.sensors.smart_sensor.SensorWork |
| airflow.sensors.smart_sensor.CachedPokeWork |
| airflow.sensors.smart_sensor.SensorExceptionInfo |
| airflow.sensors.smart_sensor.SmartSensorOperator |
| |
| |
| |
| |
| Attributes |
| ~~~~~~~~~~ |
| |
| .. autoapisummary:: |
| |
| airflow.sensors.smart_sensor.config |
| airflow.sensors.smart_sensor.handler_config |
| airflow.sensors.smart_sensor.formatter_config |
| airflow.sensors.smart_sensor.dictConfigurator |
| |
| |
| .. py:data:: config |
| |
| |
| |
| |
| .. py:data:: handler_config |
| |
| |
| |
| |
| .. py:data:: formatter_config |
| |
| |
| |
| |
| .. py:data:: dictConfigurator |
| |
| |
| |
| |
| .. py:class:: SensorWork(si) |
| |
| This class stores a sensor work with decoded context value. It is only used |
| inside of smart sensor. Create a sensor work based on sensor instance record. |
| A sensor work object has the following attributes: |
| `dag_id`: sensor_instance dag_id. |
| `task_id`: sensor_instance task_id. |
| `execution_date`: sensor_instance execution_date. |
| `try_number`: sensor_instance try_number |
| `poke_context`: Decoded poke_context for the sensor task. |
| `execution_context`: Decoded execution_context. |
| `hashcode`: This is the signature of poking job. |
| `operator`: The sensor operator class. |
| `op_classpath`: The sensor operator class path |
| `encoded_poke_context`: The raw data from sensor_instance poke_context column. |
| `log`: The sensor work logger which will mock the corresponding task instance log. |
| |
| :param si: The sensor_instance ORM object. |
| |
| .. py:method:: __eq__(self, other) |
| |
| Return self==value. |
| |
| |
| .. py:method:: create_new_task_handler() |
| :staticmethod: |
| |
| Create task log handler for a sensor work. |
| :return: log handler |
| |
| |
| .. py:method:: close_sensor_logger(self) |
| |
| Close log handler for a sensor work. |
| |
| |
| .. py:method:: ti_key(self) |
| :property: |
| |
| Key for the task instance that maps to the sensor work. |
| |
| |
| .. py:method:: cache_key(self) |
| :property: |
| |
| Key used to query in smart sensor for cached sensor work. |
| |
| |
| |
| .. py:class:: CachedPokeWork |
| |
| Wrapper class for the poke work inside smart sensor. It saves |
| the sensor_task used to poke and recent poke result state. |
| state: poke state. |
| sensor_task: The cached object for executing the poke function. |
| last_poke_time: The latest time this cached work being called. |
| to_flush: If we should flush the cached work. |
| |
| .. py:method:: set_state(self, state) |
| |
| Set state for cached poke work. |
| :param state: The sensor_instance state. |
| |
| |
| .. py:method:: clear_state(self) |
| |
| Clear state for cached poke work. |
| |
| |
| .. py:method:: set_to_flush(self) |
| |
| Mark this poke work to be popped from cached dict after current loop. |
| |
| |
| .. py:method:: is_expired(self) |
| |
| The cached task object expires if there is no poke for 20 minutes. |
| :return: Boolean |
| |
| |
| |
| .. py:class:: SensorExceptionInfo(exception_info, is_infra_failure=False, infra_failure_retry_window=datetime.timedelta(minutes=130)) |
| |
| Hold sensor exception information and the type of exception. For possible transient |
| infra failure, give the task more chance to retry before fail it. |
| |
| .. py:method:: set_latest_exception(self, exception_info, is_infra_failure=False) |
| |
| This function set the latest exception information for sensor exception. If the exception |
| implies an infra failure, this function will check the recorded infra failure timeout |
| which was set at the first infra failure exception arrives. There is a 6 hours window |
| for retry without failing current run. |
| |
| :param exception_info: Details of the exception information. |
| :param is_infra_failure: If current exception was caused by transient infra failure. |
| There is a retry window _infra_failure_retry_window that the smart sensor will |
| retry poke function without failing current task run. |
| |
| |
| .. py:method:: set_infra_failure_timeout(self) |
| |
| Set the time point when the sensor should be failed if it kept getting infra |
| failure. |
| :return: |
| |
| |
| .. py:method:: should_fail_current_run(self) |
| |
| :return: Should the sensor fail |
| :type: boolean |
| |
| |
| .. py:method:: exception_info(self) |
| :property: |
| |
| :return: exception msg. |
| |
| |
| .. py:method:: is_infra_failure(self) |
| :property: |
| |
| :return: If the exception is an infra failure |
| :type: boolean |
| |
| |
| .. py:method:: is_expired(self) |
| |
| :return: If current exception need to be kept. |
| :type: boolean |
| |
| |
| |
| .. py:class:: SmartSensorOperator(poke_interval=180, smart_sensor_timeout=60 * 60 * 24 * 7, soft_fail=False, shard_min=0, shard_max=100000, poke_timeout=6.0, *args, **kwargs) |
| |
| Bases: :py:obj:`airflow.models.BaseOperator`, :py:obj:`airflow.models.SkipMixin` |
| |
| Smart sensor operators are derived from this class. |
| |
| Smart Sensor operators keep refresh a dictionary by visiting DB. |
| Taking qualified active sensor tasks. Different from sensor operator, |
| Smart sensor operators poke for all sensor tasks in the dictionary at |
| a time interval. When a criteria is met or fail by time out, it update |
| all sensor task state in task_instance table |
| |
| :param soft_fail: Set to true to mark the task as SKIPPED on failure |
| :type soft_fail: bool |
| :param poke_interval: Time in seconds that the job should wait in |
| between each tries. |
| :type poke_interval: int |
| :param smart_sensor_timeout: Time, in seconds before the internal sensor |
| job times out if poke_timeout is not defined. |
| :type smart_sensor_timeout: float |
| :param shard_min: shard code lower bound (inclusive) |
| :type shard_min: int |
| :param shard_max: shard code upper bound (exclusive) |
| :type shard_max: int |
| :param poke_timeout: Time, in seconds before the task times out and fails. |
| :type poke_timeout: float |
| |
| .. py:attribute:: ui_color |
| :annotation: = #e6f1f2 |
| |
| |
| |
| .. py:method:: flush_cached_sensor_poke_results(self) |
| |
| Flush outdated cached sensor states saved in previous loop. |
| |
| |
| .. py:method:: poke(self, sensor_work) |
| |
| Function that the sensors defined while deriving this class should |
| override. |
| |
| |
| |
| .. py:method:: execute(self, context) |
| |
| This is the main method to derive when creating an operator. |
| Context is the same dictionary used as when rendering jinja templates. |
| |
| Refer to get_template_context for more context. |
| |
| |
| .. py:method:: on_kill(self) |
| |
| Override this method to cleanup subprocesses when a task instance |
| gets killed. Any use of the threading, subprocess or multiprocessing |
| module within an operator needs to be cleaned up or it will leave |
| ghost processes behind. |
| |
| |
| |