| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import time |
| |
| import pytest |
| |
| from kubernetes_tests.test_base import EXECUTOR, TestBase |
| |
| |
| @pytest.mark.skipif(EXECUTOR != 'KubernetesExecutor', reason="Only runs on KubernetesExecutor") |
| class TestKubernetesExecutor(TestBase): |
| def test_integration_run_dag(self): |
| dag_id = 'example_kubernetes_executor' |
| dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, self.host) |
| print(f"Found the job with execution_date {execution_date}") |
| |
| # Wait some time for the operator to complete |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id='start_task', |
| expected_final_state='success', |
| timeout=300, |
| ) |
| |
| self.ensure_dag_expected_state( |
| host=self.host, |
| execution_date=execution_date, |
| dag_id=dag_id, |
| expected_final_state='success', |
| timeout=300, |
| ) |
| |
| def test_integration_run_dag_with_scheduler_failure(self): |
| dag_id = 'example_kubernetes_executor' |
| |
| dag_run_id, execution_date = self.start_job_in_kubernetes(dag_id, self.host) |
| |
| self._delete_airflow_pod("scheduler") |
| |
| time.sleep(10) # give time for pod to restart |
| |
| # Wait some time for the operator to complete |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id='start_task', |
| expected_final_state='success', |
| timeout=300, |
| ) |
| |
| self.monitor_task( |
| host=self.host, |
| dag_run_id=dag_run_id, |
| dag_id=dag_id, |
| task_id='other_namespace_task', |
| expected_final_state='success', |
| timeout=300, |
| ) |
| |
| self.ensure_dag_expected_state( |
| host=self.host, |
| execution_date=execution_date, |
| dag_id=dag_id, |
| expected_final_state='success', |
| timeout=300, |
| ) |
| |
| assert self._num_pods_in_namespace('test-namespace') == 0, "failed to delete pods in other namespace" |