Add retry logic for KubernetesCreateResourceOperator and KubernetesJobOperator (#39201)

diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 1b3c425..8441b42 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -25,6 +25,7 @@
 from typing import TYPE_CHECKING, Any, Generator
 
 import aiofiles
+import tenacity
 from asgiref.sync import sync_to_async
 from kubernetes import client, config, watch
 from kubernetes.config import ConfigException
@@ -35,6 +36,7 @@
 from airflow.hooks.base import BaseHook
 from airflow.models import Connection
 from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
 from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol
 from airflow.utils import yaml
 
@@ -486,6 +488,12 @@
         except Exception as exc:
             raise exc
 
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(3),
+        wait=tenacity.wait_random_exponential(),
+        reraise=True,
+        retry=tenacity.retry_if_exception(should_retry_creation),
+    )
     def create_job(
         self,
         job: V1Job,
diff --git a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
index c2e52b1..352ed59 100644
--- a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
@@ -23,6 +23,7 @@
 
 import pendulum
 from deprecated import deprecated
+from kubernetes.client.rest import ApiException
 from slugify import slugify
 
 from airflow.compat.functools import cache
@@ -181,3 +182,18 @@
     else:
         annotations_for_logging = "<omitted>"
     return annotations_for_logging
+
+
+def should_retry_creation(exception: BaseException) -> bool:
+    """
+    Check if an Exception indicates a transient error and warrants retrying.
+
+    This function is needed for preventing 'No agent available' error. The error appears time to time
+    when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
+    has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
+    time when this error appears.
+    More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
+    """
+    if isinstance(exception, ApiException):
+        return str(exception.status) == "500"
+    return False
diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py b/airflow/providers/cncf/kubernetes/operators/resource.py
index 6ecbad6..417f754 100644
--- a/airflow/providers/cncf/kubernetes/operators/resource.py
+++ b/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -22,12 +22,14 @@
 from functools import cached_property
 from typing import TYPE_CHECKING, Sequence
 
+import tenacity
 import yaml
 from kubernetes.utils import create_from_yaml
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
 from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
 from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator
 
@@ -126,7 +128,14 @@
         else:
             self.custom_object_client.create_cluster_custom_object(group, version, plural, body)
 
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(3),
+        wait=tenacity.wait_random_exponential(),
+        reraise=True,
+        retry=tenacity.retry_if_exception(should_retry_creation),
+    )
     def _create_objects(self, objects):
+        self.log.info("Starting resource creation")
         if not self.custom_resource_definition:
             create_from_yaml(
                 k8s_client=self.client,
@@ -144,6 +153,7 @@
                 self._create_objects(yaml.safe_load_all(stream))
         else:
             raise AirflowException("File %s not found", self.yaml_conf_file)
+        self.log.info("Resource was created")
 
 
 class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index f02ccd1..c5aa62e 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -26,6 +26,7 @@
 
 import kubernetes
 import pytest
+from kubernetes.client.rest import ApiException
 from kubernetes.config import ConfigException
 from sqlalchemy.orm import make_transient
 
@@ -624,6 +625,44 @@
         mock_sleep.assert_has_calls([mock.call(POLL_INTERVAL)] * 4)
         assert job_actual == job_expected
 
+    @patch(f"{HOOK_MODULE}.json.dumps")
+    @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+    def test_create_job_retries_on_500_error(self, mock_client, mock_json_dumps):
+        mock_client.create_namespaced_job.side_effect = [
+            ApiException(status=500),
+            MagicMock(),
+        ]
+
+        hook = KubernetesHook()
+        hook.create_job(job=mock.MagicMock())
+
+        assert mock_client.create_namespaced_job.call_count == 2
+
+    @patch(f"{HOOK_MODULE}.json.dumps")
+    @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+    def test_create_job_fails_on_other_exception(self, mock_client, mock_json_dumps):
+        mock_client.create_namespaced_job.side_effect = [ApiException(status=404)]
+
+        hook = KubernetesHook()
+        with pytest.raises(ApiException):
+            hook.create_job(job=mock.MagicMock())
+
+    @patch(f"{HOOK_MODULE}.json.dumps")
+    @patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
+    def test_create_job_retries_three_times(self, mock_client, mock_json_dumps):
+        mock_client.create_namespaced_job.side_effect = [
+            ApiException(status=500),
+            ApiException(status=500),
+            ApiException(status=500),
+            ApiException(status=500),
+        ]
+
+        hook = KubernetesHook()
+        with pytest.raises(ApiException):
+            hook.create_job(job=mock.MagicMock())
+
+        assert mock_client.create_namespaced_job.call_count == 3
+
 
 class TestKubernetesHookIncorrectConfiguration:
     @pytest.mark.parametrize(
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py b/tests/providers/cncf/kubernetes/operators/test_resource.py
index d2cef33..ec7a8c8 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -16,10 +16,11 @@
 # under the License.
 from __future__ import annotations
 
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
 
 import pytest
 import yaml
+from kubernetes.client.rest import ApiException
 
 from airflow import DAG
 from airflow.providers.cncf.kubernetes.operators.resource import (
@@ -237,3 +238,61 @@
             "resourceflavors",
             "default-flavor-test",
         )
+
+    @patch("kubernetes.config.load_kube_config")
+    @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+    def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context):
+        mock_create_from_yaml.side_effect = [
+            ApiException(status=500),
+            MagicMock(),
+        ]
+
+        op = KubernetesCreateResourceOperator(
+            yaml_conf=TEST_VALID_RESOURCE_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+            config_file="/foo/bar",
+        )
+        op.execute(context)
+
+        assert mock_create_from_yaml.call_count == 2
+
+    @patch("kubernetes.config.load_kube_config")
+    @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+    def test_create_objects_fails_on_other_exception(
+        self, mock_create_from_yaml, mock_load_kube_config, context
+    ):
+        mock_create_from_yaml.side_effect = [ApiException(status=404)]
+
+        op = KubernetesCreateResourceOperator(
+            yaml_conf=TEST_VALID_RESOURCE_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+            config_file="/foo/bar",
+        )
+        with pytest.raises(ApiException):
+            op.execute(context)
+
+    @patch("kubernetes.config.load_kube_config")
+    @patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
+    def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context):
+        mock_create_from_yaml.side_effect = [
+            ApiException(status=500),
+            ApiException(status=500),
+            ApiException(status=500),
+            ApiException(status=500),
+        ]
+
+        op = KubernetesCreateResourceOperator(
+            yaml_conf=TEST_VALID_RESOURCE_YAML,
+            dag=self.dag,
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_task_id",
+            config_file="/foo/bar",
+        )
+        with pytest.raises(ApiException):
+            op.execute(context)
+
+        assert mock_create_from_yaml.call_count == 3