blob: ec7a8c8f7a0c574c2c6754ad6e17955507e894cd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
import yaml
from kubernetes.client.rest import ApiException
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.resource import (
KubernetesCreateResourceOperator,
KubernetesDeleteResourceOperator,
)
from airflow.utils import timezone
TEST_VALID_RESOURCE_YAML = """
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test_pvc
spec:
accessModes:
- ReadWriteOnce
storageClassName: standard
resources:
requests:
storage: 5Gi
"""
TEST_VALID_LIST_RESOURCE_YAML = """
apiVersion: v1
kind: List
items:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test_pvc_1
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: test_pvc_2
"""
TEST_VALID_CRD_YAML = """
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
entrypoint: python /home/ray/program/job.py
shutdownAfterJobFinishes: true
"""
TEST_NOT_NAMESPACED_CRD_YAML = """
apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
name: default-flavor-test
"""
HOOK_CLASS = "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"
@pytest.mark.db_test
@patch("airflow.utils.context.Context")
class TestKubernetesXResourceOperator:
@pytest.fixture(autouse=True)
def setup_tests(self, dag_maker):
self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
self._default_client_mock = self._default_client_patch.start()
yield
patch.stopall()
def setup_method(self):
args = {"owner": "airflow", "start_date": timezone.datetime(2020, 2, 1)}
self.dag = DAG("test_dag_id", default_args=args)
@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
def test_create_application_from_yaml(
self, mock_create_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)
mock_create_namespaced_persistent_volume_claim.assert_called_once_with(
body=yaml.safe_load(TEST_VALID_RESOURCE_YAML), namespace="default"
)
@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
def test_create_application_from_yaml_list(self, mock_create_namespaced_persistent_volume_claim, context):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
)
op.execute(context)
assert mock_create_namespaced_persistent_volume_claim.call_count == 2
@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_single_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)
mock_delete_namespaced_persistent_volume_claim.assert_called()
@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_multi_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)
mock_delete_namespaced_persistent_volume_claim.assert_called()
@patch("kubernetes.client.api.CustomObjectsApi.create_namespaced_custom_object")
def test_create_custom_application_from_yaml(self, mock_create_namespaced_custom_object, context):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
)
op.execute(context)
mock_create_namespaced_custom_object.assert_called_once_with(
"ray.io",
"v1",
"default",
"rayjobs",
yaml.safe_load(TEST_VALID_CRD_YAML),
)
@patch("kubernetes.client.api.CustomObjectsApi.delete_namespaced_custom_object")
def test_delete_custom_application_from_yaml(self, mock_delete_namespaced_custom_object, context):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
)
op.execute(context)
mock_delete_namespaced_custom_object.assert_called_once_with(
"ray.io",
"v1",
"default",
"rayjobs",
"rayjob-sample",
)
@patch("kubernetes.client.api.CustomObjectsApi.create_cluster_custom_object")
def test_create_not_namespaced_custom_app_from_yaml(self, mock_create_cluster_custom_object, context):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_NOT_NAMESPACED_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
namespaced=False,
)
op.execute(context)
mock_create_cluster_custom_object.assert_called_once_with(
"kueue.x-k8s.io",
"v1beta1",
"resourceflavors",
yaml.safe_load(TEST_NOT_NAMESPACED_CRD_YAML),
)
@patch("kubernetes.client.api.CustomObjectsApi.delete_cluster_custom_object")
def test_delete_not_namespaced_custom_app_from_yaml(self, mock_delete_cluster_custom_object, context):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_NOT_NAMESPACED_CRD_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
custom_resource_definition=True,
namespaced=False,
)
op.execute(context)
mock_delete_cluster_custom_object.assert_called_once_with(
"kueue.x-k8s.io",
"v1beta1",
"resourceflavors",
"default-flavor-test",
)
@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
MagicMock(),
]
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)
assert mock_create_from_yaml.call_count == 2
@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_fails_on_other_exception(
self, mock_create_from_yaml, mock_load_kube_config, context
):
mock_create_from_yaml.side_effect = [ApiException(status=404)]
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)
@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
]
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)
assert mock_create_from_yaml.call_count == 3