blob: 5a4efc73d438332ab65ad6664a867066e661353b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import sys
import unittest
from copy import copy
from unittest import mock
from unittest.mock import MagicMock, patch
import kubernetes.client.models as k8s
import pendulum
import pytest
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
from airflow.exceptions import AirflowException
from airflow.kubernetes.pod import Port
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
# noinspection DuplicatedCode
HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesHook"
def create_context(task):
dag = DAG(dag_id="dag")
tzinfo = pendulum.timezone("Europe/Amsterdam")
execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
dag_run = DagRun(
dag_id=dag.dag_id,
execution_date=execution_date,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
)
task_instance = TaskInstance(task=task)
task_instance.dag_run = dag_run
task_instance.dag_id = dag.dag_id
task_instance.xcom_push = mock.Mock()
return {
"dag": dag,
"run_id": dag_run.run_id,
"task": task,
"ti": task_instance,
"task_instance": task_instance,
}
# noinspection DuplicatedCode,PyUnusedLocal
class TestKubernetesPodOperatorSystem(unittest.TestCase):
def get_current_task_name(self):
# reverse test name to make pod name unique (it has limited length)
return "_" + unittest.TestCase.id(self).replace(".", "_")[::-1]
def setUp(self):
self.maxDiff = None
self.api_client = ApiClient()
self.expected_pod = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'namespace': 'default',
'name': mock.ANY,
'annotations': {},
'labels': {
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'airflow_version': airflow_version.replace('+', '-'),
'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'dag_id': 'dag',
'task_id': 'task',
'try_number': '1',
},
},
'spec': {
'affinity': {},
'containers': [
{
'image': 'ubuntu:16.04',
'args': ["echo 10"],
'command': ["bash", "-cx"],
'env': [],
'envFrom': [],
'resources': {},
'name': 'base',
'ports': [],
'volumeMounts': [],
}
],
'hostNetwork': False,
'imagePullSecrets': [],
'initContainers': [],
'nodeSelector': {},
'restartPolicy': 'Never',
'securityContext': {},
'tolerations': [],
'volumes': [],
},
}
def tearDown(self):
hook = KubernetesHook(conn_id=None, in_cluster=False)
client = hook.core_v1_client
client.delete_collection_namespaced_pod(namespace="default")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
@mock.patch(HOOK_CLASS, new=MagicMock)
def test_image_pull_secrets_correctly_set(self, await_pod_completion_mock, create_mock):
fake_pull_secrets = "fakeSecret"
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
image_pull_secrets=fake_pull_secrets,
cluster_context='default',
)
mock_pod = MagicMock()
mock_pod.status.phase = 'Succeeded'
await_pod_completion_mock.return_value = mock_pod
context = create_context(k)
k.execute(context=context)
assert create_mock.call_args[1]['pod'].spec.image_pull_secrets == [
k8s.V1LocalObjectReference(name=fake_pull_secrets)
]
def test_working_pod(self):
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
assert self.expected_pod['spec'] == actual_pod['spec']
assert self.expected_pod['metadata']['labels'] == actual_pod['metadata']['labels']
def test_pod_node_selectors(self):
node_selectors = {'beta.kubernetes.io/os': 'linux'}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
node_selectors=node_selectors,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['nodeSelector'] = node_selectors
assert self.expected_pod == actual_pod
def test_pod_resources(self):
resources = {
'limit_cpu': 0.25,
'limit_memory': '64Mi',
'limit_ephemeral_storage': '2Gi',
'request_cpu': '250m',
'request_memory': '64Mi',
'request_ephemeral_storage': '1Gi',
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['resources'] = {
'requests': {'memory': '64Mi', 'cpu': '250m', 'ephemeral-storage': '1Gi'},
'limits': {'memory': '64Mi', 'cpu': 0.25, 'ephemeral-storage': '2Gi'},
}
assert self.expected_pod == actual_pod
def test_pod_affinity(self):
affinity = {
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [
{
'matchExpressions': [
{'key': 'beta.kubernetes.io/os', 'operator': 'In', 'values': ['linux']}
]
}
]
}
}
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
affinity=affinity,
)
context = create_context(k)
k.execute(context=context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['affinity'] = affinity
assert self.expected_pod == actual_pod
def test_port(self):
port = Port('http', 80)
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
ports=[port],
)
context = create_context(k)
k.execute(context=context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['ports'] = [{'name': 'http', 'containerPort': 80}]
assert self.expected_pod == actual_pod
def test_volume_mount(self):
with patch.object(PodManager, 'log') as mock_logger:
volume_mount = VolumeMount(
'test-volume', mount_path='/tmp/test_volume', sub_path=None, read_only=False
)
volume_config = {'persistentVolumeClaim': {'claimName': 'test-volume'}}
volume = Volume(name='test-volume', configs=volume_config)
args = [
"echo \"retrieved from mount\" > /tmp/test_volume/test.txt "
"&& cat /tmp/test_volume/test.txt"
]
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=args,
labels={"foo": "bar"},
volume_mounts=[volume_mount],
volumes=[volume],
is_delete_operator_pod=False,
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
)
context = create_context(k)
k.execute(context=context)
mock_logger.info.assert_any_call('retrieved from mount')
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
expected_pod = copy(self.expected_pod)
expected_pod['spec']['containers'][0]['args'] = args
expected_pod['spec']['containers'][0]['volumeMounts'] = [
{'name': 'test-volume', 'mountPath': '/tmp/test_volume', 'readOnly': False}
]
expected_pod['spec']['volumes'] = [
{'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}}
]
assert expected_pod == actual_pod
def test_run_as_user_root(self):
security_context = {
'securityContext': {
'runAsUser': 0,
}
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
security_context=security_context,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['securityContext'] = security_context
assert self.expected_pod == actual_pod
def test_run_as_user_non_root(self):
security_context = {
'securityContext': {
'runAsUser': 1000,
}
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
security_context=security_context,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['securityContext'] = security_context
assert self.expected_pod == actual_pod
def test_fs_group(self):
security_context = {
'securityContext': {
'fsGroup': 1000,
}
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
security_context=security_context,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['securityContext'] = security_context
assert self.expected_pod == actual_pod
def test_faulty_service_account(self):
"""pod creation should fail when service account does not exist"""
service_account = "foobar"
namespace = "default"
k = KubernetesPodOperator(
namespace=namespace,
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
service_account_name=service_account,
)
context = create_context(k)
pod = k.build_pod_request_obj(context)
with pytest.raises(
ApiException, match=f"error looking up service account {namespace}/{service_account}"
):
k.get_or_create_pod(pod, context)
def test_pod_failure(self):
"""
Tests that the task fails when a pod reports a failure
"""
bad_internal_command = ["foobar 10 "]
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=bad_internal_command,
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
)
with pytest.raises(AirflowException):
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command
assert self.expected_pod == actual_pod
def test_xcom_push(self):
return_value = '{"foo": "bar"\n, "buzz": 2}'
args = [f'echo \'{return_value}\' > /airflow/xcom/return.json']
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=args,
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=True,
)
context = create_context(k)
assert k.execute(context) == json.loads(return_value)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME)
volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
container = self.api_client.sanitize_for_serialization(PodDefaults.SIDECAR_CONTAINER)
self.expected_pod['spec']['containers'][0]['args'] = args
self.expected_pod['spec']['containers'][0]['volumeMounts'].insert(0, volume_mount)
self.expected_pod['spec']['volumes'].insert(0, volume)
self.expected_pod['spec']['containers'].append(container)
assert self.expected_pod == actual_pod
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
@mock.patch(HOOK_CLASS, new=MagicMock)
def test_envs_from_configmaps(self, mock_monitor, mock_start):
# GIVEN
configmap = 'test-configmap'
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
configmaps=[configmap],
)
# THEN
mock_pod = MagicMock()
mock_pod.status.phase = 'Succeeded'
mock_monitor.return_value = mock_pod
context = create_context(k)
k.execute(context)
assert mock_start.call_args[1]['pod'].spec.containers[0].env_from == [
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))
]
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
@mock.patch(HOOK_CLASS, new=MagicMock)
def test_envs_from_secrets(self, await_pod_completion_mock, create_mock):
# GIVEN
secret_ref = 'secret_name'
secrets = [Secret('env', None, secret_ref)]
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
secrets=secrets,
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
)
# THEN
mock_pod = MagicMock()
mock_pod.status.phase = 'Succeeded'
await_pod_completion_mock.return_value = mock_pod
context = create_context(k)
k.execute(context)
assert create_mock.call_args[1]['pod'].spec.containers[0].env_from == [
k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=secret_ref))
]
def test_env_vars(self):
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars={
"ENV1": "val1",
"ENV2": "val2",
},
pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
)
context = create_context(k)
k.execute(context)
# THEN
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['env'] = [
{'name': 'ENV1', 'value': 'val1'},
{'name': 'ENV2', 'value': 'val2'},
{'name': 'ENV3', 'valueFrom': {'fieldRef': {'fieldPath': 'status.podIP'}}},
]
assert self.expected_pod == actual_pod
def test_pod_template_file_with_overrides_system(self):
fixture = sys.path[0] + '/tests/kubernetes/basic_pod.yaml'
k = KubernetesPodOperator(
task_id="task" + self.get_current_task_name(),
labels={"foo": "bar", "fizz": "buzz"},
env_vars={"env_name": "value"},
in_cluster=False,
pod_template_file=fixture,
do_xcom_push=True,
)
context = create_context(k)
result = k.execute(context)
assert result is not None
assert k.pod.metadata.labels == {
'fizz': 'buzz',
'foo': 'bar',
'airflow_version': mock.ANY,
'dag_id': 'dag',
'run_id': 'manual__2016-01-01T0100000100-da4d1ce7b',
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
}
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", value="value")]
assert result == {"hello": "world"}
def test_init_container(self):
# GIVEN
volume_mounts = [
k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
]
init_environments = [
k8s.V1EnvVar(name='key1', value='value1'),
k8s.V1EnvVar(name='key2', value='value2'),
]
init_container = k8s.V1Container(
name="init-container",
image="ubuntu:16.04",
env=init_environments,
volume_mounts=volume_mounts,
command=["bash", "-cx"],
args=["echo 10"],
)
volume_config = {'persistentVolumeClaim': {'claimName': 'test-volume'}}
volume = Volume(name='test-volume', configs=volume_config)
expected_init_container = {
'name': 'init-container',
'image': 'ubuntu:16.04',
'command': ['bash', '-cx'],
'args': ['echo 10'],
'env': [{'name': 'key1', 'value': 'value1'}, {'name': 'key2', 'value': 'value2'}],
'volumeMounts': [{'mountPath': '/etc/foo', 'name': 'test-volume', 'readOnly': True}],
}
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
volumes=[volume],
init_containers=[init_container],
in_cluster=False,
do_xcom_push=False,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['initContainers'] = [expected_init_container]
self.expected_pod['spec']['volumes'] = [
{'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}}
]
assert self.expected_pod == actual_pod