blob: ebe5ae6c40975a5389026ce0896f51a68519399c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from unittest import mock
import pytest
from airflow.exceptions import AirflowConfigException
from airflow.executors import executor_loader
from airflow.executors.executor_loader import ConnectorSource, ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from tests_common.test_utils.config import conf_vars
class FakeExecutor:
pass
@pytest.mark.usefixtures("clean_executor_loader")
class TestExecutorLoader:
def test_no_executor_configured(self):
with conf_vars({("core", "executor"): None}):
with pytest.raises(AirflowConfigException, match=r".*not found in config$"):
executor_loader.ExecutorLoader.get_default_executor()
@pytest.mark.parametrize(
"executor_name",
[
"CeleryExecutor",
"DebugExecutor",
"KubernetesExecutor",
"LocalExecutor",
],
)
def test_should_support_executor_from_core(self, executor_name):
with conf_vars({("core", "executor"): executor_name}):
executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert executor_name == executor.__class__.__name__
assert executor.name is not None
assert executor.name == ExecutorName(
executor_loader.ExecutorLoader.executors[executor_name], alias=executor_name
)
assert executor.name.connector_source == ConnectorSource.CORE
def test_should_support_custom_path(self):
with conf_vars({("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"}):
executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert executor.__class__.__name__ == "FakeExecutor"
assert executor.name is not None
assert executor.name == ExecutorName("tests.executors.test_executor_loader.FakeExecutor")
assert executor.name.connector_source == ConnectorSource.CUSTOM_PATH
@pytest.mark.parametrize(
("executor_config", "team_executor_config", "expected_executors_list"),
[
pytest.param(
"CeleryExecutor",
[],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
],
id="one_executor",
),
pytest.param(
"CeleryExecutor",
[
("team_a", ["CeleryExecutor"]),
("team_b", ["LocalExecutor"]),
],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
"team_a",
),
ExecutorName(
"airflow.executors.local_executor.LocalExecutor",
"LocalExecutor",
"team_b",
),
],
id="one_executor_per_team",
),
pytest.param(
"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor",
[],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
ExecutorName(
"airflow.executors.local_executor.LocalExecutor",
"LocalExecutor",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
None,
),
],
id="core_executors_and_custom_module_path_executor",
),
pytest.param(
"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor",
[
("team_a", ["CeleryExecutor", "tests.executors.test_executor_loader.FakeExecutor"]),
("team_b", ["tests.executors.test_executor_loader.FakeExecutor"]),
],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
ExecutorName(
"airflow.executors.local_executor.LocalExecutor",
"LocalExecutor",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
None,
),
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
"team_a",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
None,
"team_a",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
None,
"team_b",
),
],
id="core_executors_and_custom_module_path_executor_per_team",
),
pytest.param(
(
"CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor"
),
[],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
ExecutorName(
"airflow.executors.local_executor.LocalExecutor",
"LocalExecutor",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
"fake_exec",
),
],
id="core_executors_and_custom_module_path_executor_with_aliases",
),
pytest.param(
(
"CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor"
),
[
(
"team_a",
["CeleryExecutor", "fake_exec:tests.executors.test_executor_loader.FakeExecutor"],
),
("team_b", ["fake_exec:tests.executors.test_executor_loader.FakeExecutor"]),
],
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
),
ExecutorName(
"airflow.executors.local_executor.LocalExecutor",
"LocalExecutor",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
"fake_exec",
),
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
"CeleryExecutor",
"team_a",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
"fake_exec",
"team_a",
),
ExecutorName(
"tests.executors.test_executor_loader.FakeExecutor",
"fake_exec",
"team_b",
),
],
id="core_executors_and_custom_module_path_executor_with_aliases_per_team",
),
],
)
def test_get_hybrid_executors_from_config(
self, executor_config, team_executor_config, expected_executors_list
):
with conf_vars({("core", "executor"): executor_config}):
with mock.patch(
"airflow.executors.executor_loader.ExecutorLoader._get_team_executor_configs",
return_value=team_executor_config,
):
executors = executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
def test_init_executors(self):
with conf_vars({("core", "executor"): "CeleryExecutor"}):
executors = executor_loader.ExecutorLoader.init_executors()
executor_name = executor_loader.ExecutorLoader.get_default_executor_name()
assert len(executors) == 1
assert isinstance(executors[0], CeleryExecutor)
assert "CeleryExecutor" in executor_loader.ExecutorLoader.executors
assert executor_loader.ExecutorLoader.executors["CeleryExecutor"] == executor_name.module_path
@pytest.mark.parametrize(
"executor_config",
[
"CeleryExecutor, LocalExecutor, CeleryExecutor",
"CeleryExecutor, LocalExecutor, LocalExecutor",
"CeleryExecutor, my.module.path, my.module.path",
"CeleryExecutor, my_alias:my.module.path, my.module.path",
"CeleryExecutor, my_alias:my.module.path, other_alias:my.module.path",
],
)
def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
with pytest.raises(
AirflowConfigException, match=r".+Duplicate executors are not yet supported.+"
):
executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
"executor_config",
[
"Celery::Executor, LocalExecutor",
"LocalExecutor, Ce:ler:yExecutor, DebugExecutor",
"LocalExecutor, CeleryExecutor:, DebugExecutor",
"LocalExecutor, my_cool_alias:",
"LocalExecutor, my_cool_alias:CeleryExecutor",
"LocalExecutor, module.path.first:alias_second",
],
)
def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
with pytest.raises(AirflowConfigException):
executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
("executor_config", "expected_value"),
[
("CeleryExecutor", "CeleryExecutor"),
("DebugExecutor", "DebugExecutor"),
("KubernetesExecutor", "KubernetesExecutor"),
("LocalExecutor", "LocalExecutor"),
("CeleryExecutor, LocalExecutor", "CeleryExecutor"),
("LocalExecutor, CeleryExecutor, DebugExecutor", "LocalExecutor"),
],
)
def test_should_support_import_executor_from_core(self, executor_config, expected_value):
with conf_vars({("core", "executor"): executor_config}):
executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
assert expected_value == executor.__name__
assert import_source == ConnectorSource.CORE
@pytest.mark.parametrize(
"executor_config",
[
("tests.executors.test_executor_loader.FakeExecutor"),
("tests.executors.test_executor_loader.FakeExecutor, CeleryExecutor"),
("my_cool_alias:tests.executors.test_executor_loader.FakeExecutor, CeleryExecutor"),
],
)
def test_should_support_import_custom_path(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
executor, import_source = executor_loader.ExecutorLoader.import_default_executor_cls()
assert executor.__name__ == "FakeExecutor"
assert import_source == ConnectorSource.CUSTOM_PATH
def test_load_executor(self):
with conf_vars({("core", "executor"): "LocalExecutor"}):
executor_loader.ExecutorLoader.init_executors()
assert isinstance(executor_loader.ExecutorLoader.load_executor("LocalExecutor"), LocalExecutor)
assert isinstance(
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor,
)
assert isinstance(executor_loader.ExecutorLoader.load_executor(None), LocalExecutor)
def test_load_executor_alias(self):
with conf_vars({("core", "executor"): "local_exec:airflow.executors.local_executor.LocalExecutor"}):
executor_loader.ExecutorLoader.init_executors()
assert isinstance(executor_loader.ExecutorLoader.load_executor("local_exec"), LocalExecutor)
assert isinstance(
executor_loader.ExecutorLoader.load_executor(
"airflow.executors.local_executor.LocalExecutor"
),
LocalExecutor,
)
assert isinstance(
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor,
)
@mock.patch(
"airflow.executors.executor_loader.ExecutorLoader._get_executor_names",
wraps=executor_loader.ExecutorLoader._get_executor_names,
)
def test_call_load_executor_method_without_init_executors(self, mock_get_executor_names):
with conf_vars({("core", "executor"): "LocalExecutor"}):
executor_loader.ExecutorLoader.load_executor("LocalExecutor")
mock_get_executor_names.assert_called_once()
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor", autospec=True)
def test_load_custom_executor_with_classname(self, mock_executor):
with conf_vars(
{
(
"core",
"executor",
): "my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
}
):
executor_loader.ExecutorLoader.init_executors()
assert isinstance(executor_loader.ExecutorLoader.load_executor("my_alias"), AwsEcsExecutor)
assert isinstance(executor_loader.ExecutorLoader.load_executor("AwsEcsExecutor"), AwsEcsExecutor)
assert isinstance(
executor_loader.ExecutorLoader.load_executor(
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
),
AwsEcsExecutor,
)
assert isinstance(
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
AwsEcsExecutor,
)