| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import time |
| |
| import pytest |
| |
| from kubernetes_tests.test_base import EXECUTOR, BaseK8STest # isort:skip (needed to workaround isort bug) |
| |
| |
| @pytest.mark.skipif(EXECUTOR != "KubernetesExecutor", reason="Only runs on KubernetesExecutor") |
| class TestKubernetesExecutor(BaseK8STest): |
| @pytest.mark.execution_timeout(300) |
| def test_integration_run_dag(self): |
| dag_id = "example_kubernetes_executor" |
| dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, self.host) |
| print(f"Found the job with execution_date {execution_date}") |
| |
| # Wait some time for the operator to complete |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id="start_task", |
| expected_final_state="success", |
| timeout=300, |
| ) |
| |
| self.ensure_dag_expected_state( |
| host=self.host, |
| execution_date=execution_date, |
| dag_id=dag_id, |
| expected_final_state="success", |
| timeout=300, |
| ) |
| |
| @pytest.mark.execution_timeout(300) |
| def test_integration_run_dag_with_scheduler_failure(self): |
| dag_id = "example_kubernetes_executor" |
| |
| dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, self.host) |
| |
| self._delete_airflow_pod("scheduler") |
| |
| time.sleep(10) # give time for pod to restart |
| |
| # Wait some time for the operator to complete |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id="start_task", |
| expected_final_state="success", |
| timeout=300, |
| ) |
| |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id="other_namespace_task", |
| expected_final_state="success", |
| timeout=300, |
| ) |
| |
| self.ensure_dag_expected_state( |
| host=self.host, |
| execution_date=execution_date, |
| dag_id=dag_id, |
| expected_final_state="success", |
| timeout=300, |
| ) |
| |
| assert self._num_pods_in_namespace("test-namespace") == 0, "failed to delete pods in other namespace" |