| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import os |
| import re |
| import sys |
| import uuid |
| from unittest import mock |
| from unittest.mock import MagicMock |
| |
| import pytest |
| from dateutil import parser |
| from kubernetes.client import ApiClient, models as k8s |
| from parameterized import parameterized |
| |
| from airflow import __version__ |
| from airflow.exceptions import AirflowConfigException, PodReconciliationError |
| from airflow.kubernetes.pod_generator import ( |
| PodDefaults, |
| PodGenerator, |
| datetime_to_label_safe_datestring, |
| extend_object_field, |
| merge_objects, |
| ) |
| from airflow.kubernetes.secret import Secret |
| |
| |
| class TestPodGenerator: |
| def setup_method(self): |
| self.static_uuid = uuid.UUID('cf4a56d2-8101-4217-b027-2af6216feb48') |
| self.deserialize_result = { |
| 'apiVersion': 'v1', |
| 'kind': 'Pod', |
| 'metadata': {'name': 'memory-demo', 'namespace': 'mem-example'}, |
| 'spec': { |
| 'containers': [ |
| { |
| 'args': ['--vm', '1', '--vm-bytes', '150M', '--vm-hang', '1'], |
| 'command': ['stress'], |
| 'image': 'ghcr.io/apache/airflow-stress:1.0.4-2021.07.04', |
| 'name': 'memory-demo-ctr', |
| 'resources': {'limits': {'memory': '200Mi'}, 'requests': {'memory': '100Mi'}}, |
| } |
| ] |
| }, |
| } |
| |
| self.envs = {'ENVIRONMENT': 'prod', 'LOG_LEVEL': 'warning'} |
| self.secrets = [ |
| # This should be a secretRef |
| Secret('env', None, 'secret_a'), |
| # This should be a single secret mounted in volumeMounts |
| Secret('volume', '/etc/foo', 'secret_b'), |
| # This should produce a single secret mounted in env |
| Secret('env', 'TARGET', 'secret_b', 'source_b'), |
| ] |
| |
| self.execution_date = parser.parse('2020-08-24 00:00:00.000000') |
| self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date) |
| self.dag_id = 'dag_id' |
| self.task_id = 'task_id' |
| self.try_number = 3 |
| self.labels = { |
| 'airflow-worker': 'uuid', |
| 'dag_id': self.dag_id, |
| 'execution_date': self.execution_date_label, |
| 'task_id': self.task_id, |
| 'try_number': str(self.try_number), |
| 'airflow_version': __version__.replace('+', '-'), |
| 'kubernetes_executor': 'True', |
| } |
| self.annotations = { |
| 'dag_id': self.dag_id, |
| 'task_id': self.task_id, |
| 'execution_date': self.execution_date.isoformat(), |
| 'try_number': str(self.try_number), |
| } |
| self.metadata = { |
| 'labels': self.labels, |
| 'name': 'pod_id-' + self.static_uuid.hex, |
| 'namespace': 'namespace', |
| 'annotations': self.annotations, |
| } |
| |
| self.resources = k8s.V1ResourceRequirements( |
| requests={ |
| "cpu": 1, |
| "memory": "1Gi", |
| "ephemeral-storage": "2Gi", |
| }, |
| limits={"cpu": 2, "memory": "2Gi", "ephemeral-storage": "4Gi", 'nvidia.com/gpu': 1}, |
| ) |
| |
| self.k8s_client = ApiClient() |
| self.expected = k8s.V1Pod( |
| api_version="v1", |
| kind="Pod", |
| metadata=k8s.V1ObjectMeta( |
| namespace="default", |
| name='myapp-pod-' + self.static_uuid.hex, |
| labels={'app': 'myapp'}, |
| ), |
| spec=k8s.V1PodSpec( |
| containers=[ |
| k8s.V1Container( |
| name='base', |
| image='busybox', |
| command=['sh', '-c', 'echo Hello Kubernetes!'], |
| env=[ |
| k8s.V1EnvVar(name='ENVIRONMENT', value='prod'), |
| k8s.V1EnvVar( |
| name="LOG_LEVEL", |
| value='warning', |
| ), |
| k8s.V1EnvVar( |
| name='TARGET', |
| value_from=k8s.V1EnvVarSource( |
| secret_key_ref=k8s.V1SecretKeySelector(name='secret_b', key='source_b') |
| ), |
| ), |
| ], |
| env_from=[ |
| k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_a')), |
| k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='configmap_b')), |
| k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name='secret_a')), |
| ], |
| ports=[k8s.V1ContainerPort(name="foo", container_port=1234)], |
| resources=k8s.V1ResourceRequirements( |
| requests={'memory': '100Mi'}, |
| limits={ |
| 'memory': '200Mi', |
| }, |
| ), |
| ) |
| ], |
| security_context=k8s.V1PodSecurityContext( |
| fs_group=2000, |
| run_as_user=1000, |
| ), |
| host_network=True, |
| image_pull_secrets=[ |
| k8s.V1LocalObjectReference(name="pull_secret_a"), |
| k8s.V1LocalObjectReference(name="pull_secret_b"), |
| ], |
| ), |
| ) |
| |
| @mock.patch('uuid.uuid4') |
| def test_gen_pod_extract_xcom(self, mock_uuid): |
| mock_uuid.return_value = self.static_uuid |
| path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| |
| pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True) |
| result = pod_generator.gen_pod() |
| result_dict = self.k8s_client.sanitize_for_serialization(result) |
| container_two = { |
| 'name': 'airflow-xcom-sidecar', |
| 'image': "alpine", |
| 'command': ['sh', '-c', PodDefaults.XCOM_CMD], |
| 'volumeMounts': [{'name': 'xcom', 'mountPath': '/airflow/xcom'}], |
| 'resources': {'requests': {'cpu': '1m'}}, |
| } |
| self.expected.spec.containers.append(container_two) |
| base_container: k8s.V1Container = self.expected.spec.containers[0] |
| base_container.volume_mounts = base_container.volume_mounts or [] |
| base_container.volume_mounts.append(k8s.V1VolumeMount(name="xcom", mount_path="/airflow/xcom")) |
| self.expected.spec.containers[0] = base_container |
| self.expected.spec.volumes = self.expected.spec.volumes or [] |
| self.expected.spec.volumes.append( |
| k8s.V1Volume( |
| name='xcom', |
| empty_dir={}, |
| ) |
| ) |
| result_dict = self.k8s_client.sanitize_for_serialization(result) |
| expected_dict = self.k8s_client.sanitize_for_serialization(self.expected) |
| |
| assert result_dict == expected_dict |
| |
| def test_from_obj(self): |
| result = PodGenerator.from_obj( |
| { |
| "pod_override": k8s.V1Pod( |
| api_version="v1", |
| kind="Pod", |
| metadata=k8s.V1ObjectMeta(name="foo", annotations={"test": "annotation"}), |
| spec=k8s.V1PodSpec( |
| containers=[ |
| k8s.V1Container( |
| name="base", |
| volume_mounts=[ |
| k8s.V1VolumeMount( |
| mount_path="/foo/", name="example-kubernetes-test-volume" |
| ) |
| ], |
| ) |
| ], |
| volumes=[ |
| k8s.V1Volume( |
| name="example-kubernetes-test-volume", |
| host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), |
| ) |
| ], |
| ), |
| ) |
| } |
| ) |
| result = self.k8s_client.sanitize_for_serialization(result) |
| |
| assert { |
| 'apiVersion': 'v1', |
| 'kind': 'Pod', |
| 'metadata': { |
| 'name': 'foo', |
| 'annotations': {'test': 'annotation'}, |
| }, |
| 'spec': { |
| 'containers': [ |
| { |
| 'name': 'base', |
| 'volumeMounts': [{'mountPath': '/foo/', 'name': 'example-kubernetes-test-volume'}], |
| } |
| ], |
| 'volumes': [{'hostPath': {'path': '/tmp/'}, 'name': 'example-kubernetes-test-volume'}], |
| }, |
| } == result |
| result = PodGenerator.from_obj( |
| { |
| "KubernetesExecutor": { |
| "annotations": {"test": "annotation"}, |
| "volumes": [ |
| { |
| "name": "example-kubernetes-test-volume", |
| "hostPath": {"path": "/tmp/"}, |
| }, |
| ], |
| "volume_mounts": [ |
| { |
| "mountPath": "/foo/", |
| "name": "example-kubernetes-test-volume", |
| }, |
| ], |
| } |
| } |
| ) |
| |
| result_from_pod = PodGenerator.from_obj( |
| { |
| "pod_override": k8s.V1Pod( |
| metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}), |
| spec=k8s.V1PodSpec( |
| containers=[ |
| k8s.V1Container( |
| name="base", |
| volume_mounts=[ |
| k8s.V1VolumeMount( |
| name="example-kubernetes-test-volume", mount_path="/foo/" |
| ) |
| ], |
| ) |
| ], |
| volumes=[k8s.V1Volume(name="example-kubernetes-test-volume", host_path="/tmp/")], |
| ), |
| ) |
| } |
| ) |
| |
| result = self.k8s_client.sanitize_for_serialization(result) |
| result_from_pod = self.k8s_client.sanitize_for_serialization(result_from_pod) |
| expected_from_pod = { |
| 'metadata': {'annotations': {'test': 'annotation'}}, |
| 'spec': { |
| 'containers': [ |
| { |
| 'name': 'base', |
| 'volumeMounts': [{'mountPath': '/foo/', 'name': 'example-kubernetes-test-volume'}], |
| } |
| ], |
| 'volumes': [{'hostPath': '/tmp/', 'name': 'example-kubernetes-test-volume'}], |
| }, |
| } |
| assert ( |
| result_from_pod == expected_from_pod |
| ), "There was a discrepancy between KubernetesExecutor and pod_override" |
| |
| assert { |
| 'apiVersion': 'v1', |
| 'kind': 'Pod', |
| 'metadata': { |
| 'annotations': {'test': 'annotation'}, |
| }, |
| 'spec': { |
| 'containers': [ |
| { |
| 'args': [], |
| 'command': [], |
| 'env': [], |
| 'envFrom': [], |
| 'name': 'base', |
| 'ports': [], |
| 'volumeMounts': [{'mountPath': '/foo/', 'name': 'example-kubernetes-test-volume'}], |
| } |
| ], |
| 'hostNetwork': False, |
| 'imagePullSecrets': [], |
| 'volumes': [{'hostPath': {'path': '/tmp/'}, 'name': 'example-kubernetes-test-volume'}], |
| }, |
| } == result |
| |
| @mock.patch('uuid.uuid4') |
| def test_reconcile_pods_empty_mutator_pod(self, mock_uuid): |
| mock_uuid.return_value = self.static_uuid |
| path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| |
| pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True) |
| base_pod = pod_generator.gen_pod() |
| mutator_pod = None |
| name = 'name1-' + self.static_uuid.hex |
| |
| base_pod.metadata.name = name |
| |
| result = PodGenerator.reconcile_pods(base_pod, mutator_pod) |
| assert base_pod == result |
| |
| mutator_pod = k8s.V1Pod() |
| result = PodGenerator.reconcile_pods(base_pod, mutator_pod) |
| assert base_pod == result |
| |
| @mock.patch('uuid.uuid4') |
| def test_reconcile_pods(self, mock_uuid): |
| mock_uuid.return_value = self.static_uuid |
| path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| |
| base_pod = PodGenerator(pod_template_file=path, extract_xcom=False).gen_pod() |
| |
| mutator_pod = k8s.V1Pod( |
| metadata=k8s.V1ObjectMeta( |
| name="name2", |
| labels={"bar": "baz"}, |
| ), |
| spec=k8s.V1PodSpec( |
| containers=[ |
| k8s.V1Container( |
| image='', |
| name='name', |
| command=['/bin/command2.sh', 'arg2'], |
| volume_mounts=[ |
| k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume2") |
| ], |
| ) |
| ], |
| volumes=[ |
| k8s.V1Volume( |
| host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), |
| name="example-kubernetes-test-volume2", |
| ) |
| ], |
| ), |
| ) |
| |
| result = PodGenerator.reconcile_pods(base_pod, mutator_pod) |
| expected: k8s.V1Pod = self.expected |
| expected.metadata.name = "name2" |
| expected.metadata.labels['bar'] = 'baz' |
| expected.spec.volumes = expected.spec.volumes or [] |
| expected.spec.volumes.append( |
| k8s.V1Volume( |
| host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), name="example-kubernetes-test-volume2" |
| ) |
| ) |
| |
| base_container: k8s.V1Container = expected.spec.containers[0] |
| base_container.command = ['/bin/command2.sh', 'arg2'] |
| base_container.volume_mounts = [ |
| k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume2") |
| ] |
| base_container.name = "name" |
| expected.spec.containers[0] = base_container |
| |
| result_dict = self.k8s_client.sanitize_for_serialization(result) |
| expected_dict = self.k8s_client.sanitize_for_serialization(expected) |
| |
| assert result_dict == expected_dict |
| |
| @pytest.mark.parametrize( |
| 'config_image, expected_image', |
| [ |
| pytest.param('my_image:my_tag', 'my_image:my_tag', id='image_in_cfg'), |
| pytest.param(None, 'busybox', id='no_image_in_cfg'), |
| ], |
| ) |
| @mock.patch('uuid.uuid4') |
| def test_construct_pod(self, mock_uuid, config_image, expected_image): |
| template_file = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| worker_config = PodGenerator.deserialize_model_file(template_file) |
| mock_uuid.return_value = self.static_uuid |
| executor_config = k8s.V1Pod( |
| spec=k8s.V1PodSpec( |
| containers=[ |
| k8s.V1Container( |
| name='', resources=k8s.V1ResourceRequirements(limits={'cpu': '1m', 'memory': '1G'}) |
| ) |
| ] |
| ) |
| ) |
| |
| result = PodGenerator.construct_pod( |
| dag_id=self.dag_id, |
| task_id=self.task_id, |
| pod_id='pod_id', |
| kube_image=config_image, |
| try_number=self.try_number, |
| date=self.execution_date, |
| args=['command'], |
| pod_override_object=executor_config, |
| base_worker_pod=worker_config, |
| namespace='test_namespace', |
| scheduler_job_id='uuid', |
| ) |
| expected = self.expected |
| expected.metadata.labels = self.labels |
| expected.metadata.labels['app'] = 'myapp' |
| expected.metadata.annotations = self.annotations |
| expected.metadata.name = 'pod_id-' + self.static_uuid.hex |
| expected.metadata.namespace = 'test_namespace' |
| expected.spec.containers[0].args = ['command'] |
| expected.spec.containers[0].image = expected_image |
| expected.spec.containers[0].resources = {'limits': {'cpu': '1m', 'memory': '1G'}} |
| expected.spec.containers[0].env.append( |
| k8s.V1EnvVar( |
| name="AIRFLOW_IS_K8S_EXECUTOR_POD", |
| value='True', |
| ) |
| ) |
| result_dict = self.k8s_client.sanitize_for_serialization(result) |
| expected_dict = self.k8s_client.sanitize_for_serialization(self.expected) |
| |
| assert expected_dict == result_dict |
| |
| @mock.patch('uuid.uuid4') |
| def test_construct_pod_mapped_task(self, mock_uuid): |
| template_file = sys.path[0] + '/tests/kubernetes/pod_generator_base.yaml' |
| worker_config = PodGenerator.deserialize_model_file(template_file) |
| mock_uuid.return_value = self.static_uuid |
| |
| result = PodGenerator.construct_pod( |
| dag_id=self.dag_id, |
| task_id=self.task_id, |
| pod_id='pod_id', |
| try_number=self.try_number, |
| kube_image='', |
| map_index=0, |
| date=self.execution_date, |
| args=['command'], |
| pod_override_object=None, |
| base_worker_pod=worker_config, |
| namespace='test_namespace', |
| scheduler_job_id='uuid', |
| ) |
| expected = self.expected |
| expected.metadata.labels = self.labels |
| expected.metadata.labels['app'] = 'myapp' |
| expected.metadata.labels['map_index'] = '0' |
| expected.metadata.annotations = self.annotations |
| expected.metadata.annotations['map_index'] = '0' |
| expected.metadata.name = 'pod_id-' + self.static_uuid.hex |
| expected.metadata.namespace = 'test_namespace' |
| expected.spec.containers[0].args = ['command'] |
| del expected.spec.containers[0].env_from[1:] |
| del expected.spec.containers[0].env[-1:] |
| expected.spec.containers[0].env.append(k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')) |
| result_dict = self.k8s_client.sanitize_for_serialization(result) |
| expected_dict = self.k8s_client.sanitize_for_serialization(expected) |
| |
| assert expected_dict == result_dict |
| |
| @mock.patch('uuid.uuid4') |
| def test_construct_pod_empty_executor_config(self, mock_uuid): |
| path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| worker_config = PodGenerator.deserialize_model_file(path) |
| mock_uuid.return_value = self.static_uuid |
| executor_config = None |
| |
| result = PodGenerator.construct_pod( |
| dag_id='dag_id', |
| task_id='task_id', |
| pod_id='pod_id', |
| kube_image='test-image', |
| try_number=3, |
| date=self.execution_date, |
| args=['command'], |
| pod_override_object=executor_config, |
| base_worker_pod=worker_config, |
| namespace='namespace', |
| scheduler_job_id='uuid', |
| ) |
| sanitized_result = self.k8s_client.sanitize_for_serialization(result) |
| worker_config.spec.containers[0].image = "test-image" |
| worker_config.spec.containers[0].args = ["command"] |
| worker_config.metadata.annotations = self.annotations |
| worker_config.metadata.labels = self.labels |
| worker_config.metadata.labels['app'] = 'myapp' |
| worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex |
| worker_config.metadata.namespace = 'namespace' |
| worker_config.spec.containers[0].env.append( |
| k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True') |
| ) |
| worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config) |
| assert worker_config_result == sanitized_result |
| |
| @mock.patch('uuid.uuid4') |
| def test_construct_pod_attribute_error(self, mock_uuid): |
| """ |
| After upgrading k8s library we might get attribute error. |
| In this case it should raise PodReconciliationError |
| """ |
| path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' |
| worker_config = PodGenerator.deserialize_model_file(path) |
| mock_uuid.return_value = self.static_uuid |
| executor_config = MagicMock() |
| executor_config.side_effect = AttributeError('error') |
| |
| with pytest.raises(PodReconciliationError): |
| PodGenerator.construct_pod( |
| dag_id='dag_id', |
| task_id='task_id', |
| pod_id='pod_id', |
| kube_image='test-image', |
| try_number=3, |
| date=self.execution_date, |
| args=['command'], |
| pod_override_object=executor_config, |
| base_worker_pod=worker_config, |
| namespace='namespace', |
| scheduler_job_id='uuid', |
| ) |
| |
| @mock.patch('uuid.uuid4') |
| def test_ensure_max_label_length(self, mock_uuid): |
| mock_uuid.return_value = self.static_uuid |
| path = os.path.join(os.path.dirname(__file__), 'pod_generator_base_with_secrets.yaml') |
| worker_config = PodGenerator.deserialize_model_file(path) |
| |
| result = PodGenerator.construct_pod( |
| dag_id='a' * 512, |
| task_id='a' * 512, |
| pod_id='a' * 512, |
| kube_image='a' * 512, |
| try_number=3, |
| date=self.execution_date, |
| args=['command'], |
| namespace='namespace', |
| scheduler_job_id='a' * 512, |
| pod_override_object=None, |
| base_worker_pod=worker_config, |
| ) |
| |
| assert result.metadata.name == 'a' * 30 + '-' + self.static_uuid.hex |
| for _, v in result.metadata.labels.items(): |
| assert len(v) <= 63 |
| |
| assert 'a' * 512 == result.metadata.annotations['dag_id'] |
| assert 'a' * 512 == result.metadata.annotations['task_id'] |
| |
| def test_merge_objects_empty(self): |
| annotations = {'foo1': 'bar1'} |
| base_obj = k8s.V1ObjectMeta(annotations=annotations) |
| client_obj = None |
| res = merge_objects(base_obj, client_obj) |
| assert base_obj == res |
| |
| client_obj = k8s.V1ObjectMeta() |
| res = merge_objects(base_obj, client_obj) |
| assert base_obj == res |
| |
| client_obj = k8s.V1ObjectMeta(annotations=annotations) |
| base_obj = None |
| res = merge_objects(base_obj, client_obj) |
| assert client_obj == res |
| |
| base_obj = k8s.V1ObjectMeta() |
| res = merge_objects(base_obj, client_obj) |
| assert client_obj == res |
| |
| def test_merge_objects(self): |
| base_annotations = {'foo1': 'bar1'} |
| base_labels = {'foo1': 'bar1'} |
| client_annotations = {'foo2': 'bar2'} |
| base_obj = k8s.V1ObjectMeta(annotations=base_annotations, labels=base_labels) |
| client_obj = k8s.V1ObjectMeta(annotations=client_annotations) |
| res = merge_objects(base_obj, client_obj) |
| client_obj.labels = base_labels |
| assert client_obj == res |
| |
| def test_extend_object_field_empty(self): |
| ports = [k8s.V1ContainerPort(container_port=1, name='port')] |
| base_obj = k8s.V1Container(name='base_container', ports=ports) |
| client_obj = k8s.V1Container(name='client_container') |
| res = extend_object_field(base_obj, client_obj, 'ports') |
| client_obj.ports = ports |
| assert client_obj == res |
| |
| base_obj = k8s.V1Container(name='base_container') |
| client_obj = k8s.V1Container(name='base_container', ports=ports) |
| res = extend_object_field(base_obj, client_obj, 'ports') |
| assert client_obj == res |
| |
| def test_extend_object_field_not_list(self): |
| base_obj = k8s.V1Container(name='base_container', image='image') |
| client_obj = k8s.V1Container(name='client_container') |
| with pytest.raises(ValueError): |
| extend_object_field(base_obj, client_obj, 'image') |
| base_obj = k8s.V1Container(name='base_container') |
| client_obj = k8s.V1Container(name='client_container', image='image') |
| with pytest.raises(ValueError): |
| extend_object_field(base_obj, client_obj, 'image') |
| |
| def test_extend_object_field(self): |
| base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] |
| base_obj = k8s.V1Container(name='base_container', ports=base_ports) |
| client_ports = [k8s.V1ContainerPort(container_port=1, name='client_port')] |
| client_obj = k8s.V1Container(name='client_container', ports=client_ports) |
| res = extend_object_field(base_obj, client_obj, 'ports') |
| client_obj.ports = base_ports + client_ports |
| assert client_obj == res |
| |
| def test_reconcile_containers_empty(self): |
| base_objs = [k8s.V1Container(name='base_container')] |
| client_objs = [] |
| res = PodGenerator.reconcile_containers(base_objs, client_objs) |
| assert base_objs == res |
| |
| client_objs = [k8s.V1Container(name='client_container')] |
| base_objs = [] |
| res = PodGenerator.reconcile_containers(base_objs, client_objs) |
| assert client_objs == res |
| |
| res = PodGenerator.reconcile_containers([], []) |
| assert res == [] |
| |
| def test_reconcile_containers(self): |
| base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] |
| base_objs = [ |
| k8s.V1Container(name='base_container1', ports=base_ports), |
| k8s.V1Container(name='base_container2', image='base_image'), |
| ] |
| client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')] |
| client_objs = [ |
| k8s.V1Container(name='client_container1', ports=client_ports), |
| k8s.V1Container(name='client_container2', image='client_image'), |
| ] |
| res = PodGenerator.reconcile_containers(base_objs, client_objs) |
| client_objs[0].ports = base_ports + client_ports |
| assert client_objs == res |
| |
| base_ports = [k8s.V1ContainerPort(container_port=1, name='base_port')] |
| base_objs = [ |
| k8s.V1Container(name='base_container1', ports=base_ports), |
| k8s.V1Container(name='base_container2', image='base_image'), |
| ] |
| client_ports = [k8s.V1ContainerPort(container_port=2, name='client_port')] |
| client_objs = [ |
| k8s.V1Container(name='client_container1', ports=client_ports), |
| k8s.V1Container(name='client_container2', stdin=True), |
| ] |
| res = PodGenerator.reconcile_containers(base_objs, client_objs) |
| client_objs[0].ports = base_ports + client_ports |
| client_objs[1].image = 'base_image' |
| assert client_objs == res |
| |
| def test_reconcile_specs_empty(self): |
| base_spec = k8s.V1PodSpec(containers=[]) |
| client_spec = None |
| res = PodGenerator.reconcile_specs(base_spec, client_spec) |
| assert base_spec == res |
| |
| base_spec = None |
| client_spec = k8s.V1PodSpec(containers=[]) |
| res = PodGenerator.reconcile_specs(base_spec, client_spec) |
| assert client_spec == res |
| |
| def test_reconcile_specs(self): |
| base_objs = [k8s.V1Container(name='base_container1', image='base_image')] |
| client_objs = [k8s.V1Container(name='client_container1')] |
| base_spec = k8s.V1PodSpec(priority=1, active_deadline_seconds=100, containers=base_objs) |
| client_spec = k8s.V1PodSpec(priority=2, hostname='local', containers=client_objs) |
| res = PodGenerator.reconcile_specs(base_spec, client_spec) |
| client_spec.containers = [k8s.V1Container(name='client_container1', image='base_image')] |
| client_spec.active_deadline_seconds = 100 |
| assert client_spec == res |
| |
| def test_reconcile_specs_init_containers(self): |
| base_spec = k8s.V1PodSpec(containers=[], init_containers=[k8s.V1Container(name='base_container1')]) |
| client_spec = k8s.V1PodSpec( |
| containers=[], init_containers=[k8s.V1Container(name='client_container1')] |
| ) |
| res = PodGenerator.reconcile_specs(base_spec, client_spec) |
| assert res.init_containers == base_spec.init_containers + client_spec.init_containers |
| |
| def test_deserialize_model_file(self): |
| path = sys.path[0] + '/tests/kubernetes/pod.yaml' |
| result = PodGenerator.deserialize_model_file(path) |
| sanitized_res = self.k8s_client.sanitize_for_serialization(result) |
| assert sanitized_res == self.deserialize_result |
| |
| @parameterized.expand( |
| ( |
| ("max_label_length", "a" * 63), |
| ("max_subdomain_length", "a" * 253), |
| ( |
| "tiny", |
| "aaa", |
| ), |
| ) |
| ) |
| def test_pod_name_confirm_to_max_length(self, _, pod_id): |
| name = PodGenerator.make_unique_pod_id(pod_id) |
| assert len(name) <= 253 |
| parts = name.split("-") |
| |
| # 63 is the MAX_LABEL_LEN in pod_generator.py |
| # 33 is the length of uuid4 + 1 for the separating '-' (32 + 1) |
| # 30 is the max length of the prefix |
| # so 30 = 63 - (32 + 1) |
| assert len(parts[0]) <= 30 |
| assert len(parts[1]) == 32 |
| |
| @parameterized.expand( |
| ( |
| ("pod-name-with-hyphen-", "pod-name-with-hyphen"), |
| ("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"), |
| ("pod0-name", "pod0-name"), |
| ("simple", "simple"), |
| ("pod-name-with-dot.", "pod-name-with-dot"), |
| ("pod-name-with-double-dot..", "pod-name-with-double-dot"), |
| ("pod-name-with-hyphen-dot-.", "pod-name-with-hyphen-dot"), |
| ) |
| ) |
| def test_pod_name_is_valid(self, pod_id, expected_starts_with): |
| name = PodGenerator.make_unique_pod_id(pod_id) |
| |
| regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" |
| assert ( |
| len(name) <= 253 and all(ch.lower() == ch for ch in name) and re.match(regex, name) |
| ), "pod_id is invalid - fails allowed regex check" |
| |
| assert name.rsplit("-", 1)[0] == expected_starts_with |
| |
| def test_deserialize_model_string(self): |
| fixture = """ |
| apiVersion: v1 |
| kind: Pod |
| metadata: |
| name: memory-demo |
| namespace: mem-example |
| spec: |
| containers: |
| - name: memory-demo-ctr |
| image: ghcr.io/apache/airflow-stress:1.0.4-2021.07.04 |
| resources: |
| limits: |
| memory: "200Mi" |
| requests: |
| memory: "100Mi" |
| command: ["stress"] |
| args: ["--vm", "1", "--vm-bytes", "150M", "--vm-hang", "1"] |
| """ |
| result = PodGenerator.deserialize_model_file(fixture) |
| sanitized_res = self.k8s_client.sanitize_for_serialization(result) |
| assert sanitized_res == self.deserialize_result |
| |
| def test_validate_pod_generator(self): |
| with pytest.raises(AirflowConfigException): |
| PodGenerator(pod=k8s.V1Pod(), pod_template_file='k') |
| with pytest.raises(AirflowConfigException): |
| PodGenerator() |
| PodGenerator(pod_template_file='tests/kubernetes/pod.yaml') |
| PodGenerator(pod=k8s.V1Pod()) |