Add retry logic for KubernetesCreateResourceOperator and KubernetesJobOperator (#39201)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 1b3c425..8441b42 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, Generator
import aiofiles
+import tenacity
from asgiref.sync import sync_to_async
from kubernetes import client, config, watch
from kubernetes.config import ConfigException
@@ -35,6 +36,7 @@
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol
from airflow.utils import yaml
@@ -486,6 +488,12 @@
except Exception as exc:
raise exc
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(3),
+ wait=tenacity.wait_random_exponential(),
+ reraise=True,
+ retry=tenacity.retry_if_exception(should_retry_creation),
+ )
def create_job(
self,
job: V1Job,
diff --git a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
index c2e52b1..352ed59 100644
--- a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
@@ -23,6 +23,7 @@
import pendulum
from deprecated import deprecated
+from kubernetes.client.rest import ApiException
from slugify import slugify
from airflow.compat.functools import cache
@@ -181,3 +182,18 @@
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging
+
+
+def should_retry_creation(exception: BaseException) -> bool:
+ """
+ Check if an Exception indicates a transient error and warrants retrying.
+
+ This function is needed for preventing 'No agent available' error. The error appears time to time
+ when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
+ has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
+ time when this error appears.
+ More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
+ """
+ if isinstance(exception, ApiException):
+ return str(exception.status) == "500"
+ return False
diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py
index 6ecbad6..417f754 100644
--- a/airflow/providers/cncf/kubernetes/operators/resource.py
+++ b/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -22,12 +22,14 @@
from functools import cached_property
from typing import TYPE_CHECKING, Sequence
+import tenacity
import yaml
from kubernetes.utils import create_from_yaml
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator
@@ -126,7 +128,14 @@
else:
self.custom_object_client.create_cluster_custom_object(group, version, plural, body)
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(3),
+ wait=tenacity.wait_random_exponential(),
+ reraise=True,
+ retry=tenacity.retry_if_exception(should_retry_creation),
+ )
def _create_objects(self, objects):
+ self.log.info("Starting resource creation")
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
@@ -144,6 +153,7 @@
self._create_objects(yaml.safe_load_all(stream))
else:
raise AirflowException("File %s not found", self.yaml_conf_file)
+ self.log.info("Resource was created")
class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index f02ccd1..c5aa62e 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -26,6 +26,7 @@
import kubernetes
import pytest
+from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
from sqlalchemy.orm import make_transient
@@ -624,6 +625,44 @@
mock_sleep.assert_has_calls([mock.call(POLL_INTERVAL)] * 4)
assert job_actual == job_expected
+ @patch(f"{HOOK_MODULE}.json.dumps")
+ @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+ def test_create_job_retries_on_500_error(self, mock_client, mock_json_dumps):
+ mock_client.create_namespaced_job.side_effect = [
+ ApiException(status=500),
+ MagicMock(),
+ ]
+
+ hook = KubernetesHook()
+ hook.create_job(job=mock.MagicMock())
+
+ assert mock_client.create_namespaced_job.call_count == 2
+
+ @patch(f"{HOOK_MODULE}.json.dumps")
+ @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+ def test_create_job_fails_on_other_exception(self, mock_client, mock_json_dumps):
+ mock_client.create_namespaced_job.side_effect = [ApiException(status=404)]
+
+ hook = KubernetesHook()
+ with pytest.raises(ApiException):
+ hook.create_job(job=mock.MagicMock())
+
+ @patch(f"{HOOK_MODULE}.json.dumps")
+ @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+ def test_create_job_retries_three_times(self, mock_client, mock_json_dumps):
+ mock_client.create_namespaced_job.side_effect = [
+ ApiException(status=500),
+ ApiException(status=500),
+ ApiException(status=500),
+ ApiException(status=500),
+ ]
+
+ hook = KubernetesHook()
+ with pytest.raises(ApiException):
+ hook.create_job(job=mock.MagicMock())
+
+ assert mock_client.create_namespaced_job.call_count == 3
+
class TestKubernetesHookIncorrectConfiguration:
@pytest.mark.parametrize(
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py b/tests/providers/cncf/kubernetes/operators/test_resource.py
index d2cef33..ec7a8c8 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -16,10 +16,11 @@
# under the License.
from __future__ import annotations
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
import pytest
import yaml
+from kubernetes.client.rest import ApiException
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.resource import (
@@ -237,3 +238,61 @@
"resourceflavors",
"default-flavor-test",
)
+
+ @patch("kubernetes.config.load_kube_config")
+ @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+ def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context):
+ mock_create_from_yaml.side_effect = [
+ ApiException(status=500),
+ MagicMock(),
+ ]
+
+ op = KubernetesCreateResourceOperator(
+ yaml_conf=TEST_VALID_RESOURCE_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ config_file="/foo/bar",
+ )
+ op.execute(context)
+
+ assert mock_create_from_yaml.call_count == 2
+
+ @patch("kubernetes.config.load_kube_config")
+ @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+ def test_create_objects_fails_on_other_exception(
+ self, mock_create_from_yaml, mock_load_kube_config, context
+ ):
+ mock_create_from_yaml.side_effect = [ApiException(status=404)]
+
+ op = KubernetesCreateResourceOperator(
+ yaml_conf=TEST_VALID_RESOURCE_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ config_file="/foo/bar",
+ )
+ with pytest.raises(ApiException):
+ op.execute(context)
+
+ @patch("kubernetes.config.load_kube_config")
+ @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+ def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context):
+ mock_create_from_yaml.side_effect = [
+ ApiException(status=500),
+ ApiException(status=500),
+ ApiException(status=500),
+ ApiException(status=500),
+ ]
+
+ op = KubernetesCreateResourceOperator(
+ yaml_conf=TEST_VALID_RESOURCE_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ config_file="/foo/bar",
+ )
+ with pytest.raises(ApiException):
+ op.execute(context)
+
+ assert mock_create_from_yaml.call_count == 3