Disconnect GKE operators from deprecated hooks (#39434)
* Disconnect GKE operators from deprecated hooks
* Remove GKE unit tests from deprecation ignore list
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index f876622..0a28809 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -42,12 +42,8 @@
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
- GKECustomResourceHook,
- GKEDeploymentHook,
GKEHook,
- GKEJobHook,
GKEKubernetesHook,
- GKEPodHook,
)
from airflow.providers.google.cloud.links.kubernetes_engine import (
KubernetesEngineClusterLink,
@@ -533,13 +529,13 @@
)
@cached_property
- def deployment_hook(self) -> GKEDeploymentHook:
+ def deployment_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
- "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+ "Cluster url and ssl_ca_cert should be defined before using self.deployment_hook method. "
"Try to use self.get_kube_creds method",
)
- return GKEDeploymentHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
@@ -547,13 +543,14 @@
)
@cached_property
- def pod_hook(self) -> GKEPodHook:
+ def pod_hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
- "Cluster url and ssl_ca_cert should be defined before using self.hook method. "
+ "Cluster url and ssl_ca_cert should be defined before using self.pod_hook method. "
"Try to use self.get_kube_creds method",
)
- return GKEPodHook(
+
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
cluster_url=self._cluster_url,
@@ -742,21 +739,20 @@
)
@cached_property
- def hook(self) -> GKEPodHook:
+ def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
- hook = GKEPodHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
impersonation_chain=self.impersonation_chain,
enable_tcp_keepalive=True,
)
- return hook
def execute(self, context: Context):
"""Execute process of creating pod and executing provided command inside it."""
@@ -901,19 +897,18 @@
)
@cached_property
- def hook(self) -> GKEJobHook:
+ def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
- hook = GKEJobHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
)
- return hook
def execute(self, context: Context):
"""Execute process of creating Job."""
@@ -1027,7 +1022,7 @@
)
@cached_property
- def hook(self) -> GKEJobHook:
+ def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
@@ -1035,7 +1030,7 @@
cluster_hook=self.cluster_hook,
).fetch_cluster_info()
- return GKEJobHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
@@ -1128,7 +1123,7 @@
)
@cached_property
- def hook(self) -> GKEJobHook:
+ def hook(self) -> GKEKubernetesHook:
self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
cluster_name=self.cluster_name,
project_id=self.project_id,
@@ -1136,7 +1131,7 @@
cluster_hook=self.cluster_hook,
).fetch_cluster_info()
- return GKEJobHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
@@ -1234,13 +1229,13 @@
)
@cached_property
- def hook(self) -> GKECustomResourceHook:
+ def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
- return GKECustomResourceHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
@@ -1336,13 +1331,13 @@
)
@cached_property
- def hook(self) -> GKECustomResourceHook:
+ def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
- return GKECustomResourceHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
@@ -1475,14 +1470,14 @@
)
@cached_property
- def hook(self) -> GKEJobHook:
+ def hook(self) -> GKEKubernetesHook:
if self._cluster_url is None or self._ssl_ca_cert is None:
raise AttributeError(
"Cluster url and ssl_ca_cert should be defined before using self.hook method. "
"Try to use self.get_kube_creds method",
)
- return GKEJobHook(
+ return GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index c0d8fef..8557bea 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -30,7 +30,6 @@
from airflow.providers.google.cloud.hooks.kubernetes_engine import (
GKEAsyncHook,
GKEKubernetesAsyncHook,
- GKEPodAsyncHook,
)
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -147,8 +146,8 @@
)
@cached_property
- def hook(self) -> GKEPodAsyncHook: # type: ignore[override]
- return GKEPodAsyncHook(
+ def hook(self) -> GKEKubernetesAsyncHook: # type: ignore[override]
+ return GKEKubernetesAsyncHook(
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
gcp_conn_id=self.gcp_conn_id,
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index f1fafcc..91c05b3 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -517,25 +517,14 @@
- tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links
- tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links
- tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_default_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_default_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_account
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_chain_one_element
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_default_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueInsideClusterOperator::test_execute
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_default_gcp_conn_id
-- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_gcp_conn_id
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method
- tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 05c7bf2..c71cc99 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -81,9 +81,7 @@
KUB_OP_PATH = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
GKE_HOOK_MODULE_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine"
GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook"
-GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook"
-GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook"
-GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook"
+GKE_KUBERNETES_HOOK = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
KUB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute"
KUB_JOB_OPERATOR_EXEC = "airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute"
@@ -502,8 +500,8 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(f"{GKE_DEPLOYMENT_HOOK_PATH}.check_kueue_deployment_running")
- @mock.patch(GKE_POD_HOOK_PATH)
+ @mock.patch(f"{GKE_KUBERNETES_HOOK}.check_kueue_deployment_running")
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, fetch_cluster_info_mock, file_mock):
mock_pod_hook.return_value.apply_from_yaml_file.side_effect = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
@@ -515,9 +513,9 @@
@mock.patch.dict(os.environ, {})
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
- @mock.patch(GKE_DEPLOYMENT_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_POD_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster(
self, mock_pod_hook, mock_hook, mock_depl_hook, fetch_cluster_info_mock, file_mock, caplog
):
@@ -534,7 +532,7 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_POD_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
@@ -550,7 +548,7 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_POD_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_non_autoscaled_cluster_check_error(
self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, caplog
):
@@ -916,7 +914,7 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_JOB_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock):
mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
@@ -931,7 +929,7 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_JOB_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_account(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):
@@ -949,7 +947,7 @@
@mock.patch(TEMP_FILE)
@mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
@mock.patch(GKE_HOOK_PATH)
- @mock.patch(GKE_JOB_HOOK_PATH)
+ @mock.patch(GKE_KUBERNETES_HOOK)
def test_execute_with_impersonation_service_chain_one_element(
self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, get_con_mock
):