blob: 93b459d200e8708e85c311363db010713ecfec5a [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Tests for the callback supervisor module."""
from __future__ import annotations
import pytest
import structlog
from airflow.sdk.execution_time.callback_supervisor import execute_callback
def callback_no_args():
"""A simple callback that takes no arguments."""
return "ok"
def callback_with_kwargs(arg1, arg2):
"""A callback that accepts keyword arguments."""
return f"{arg1}-{arg2}"
def callback_that_raises():
"""A callback that always raises."""
raise ValueError("something went wrong")
class CallableClass:
"""A class that returns a callable instance (like BaseNotifier)."""
def __init__(self, **kwargs):
self.kwargs = kwargs
def __call__(self, context):
return "notified"
class TestExecuteCallback:
@pytest.mark.parametrize(
("path", "kwargs", "expect_success", "error_contains"),
[
pytest.param(
f"{__name__}.callback_no_args",
{},
True,
None,
id="successful_no_args",
),
pytest.param(
f"{__name__}.callback_with_kwargs",
{"arg1": "hello", "arg2": "world"},
True,
None,
id="successful_with_kwargs",
),
pytest.param(
f"{__name__}.CallableClass",
{"msg": "alert"},
True,
None,
id="callable_class_pattern",
),
pytest.param(
"",
{},
False,
"Callback path not found",
id="empty_path",
),
pytest.param(
"nonexistent.module.function",
{},
False,
"ModuleNotFoundError",
id="import_error",
),
pytest.param(
f"{__name__}.callback_that_raises",
{},
False,
"ValueError",
id="execution_error",
),
pytest.param(
f"{__name__}.nonexistent_function_xyz",
{},
False,
"AttributeError",
id="attribute_error",
),
],
)
def test_execute_callback(self, path, kwargs, expect_success, error_contains):
log = structlog.get_logger()
success, error = execute_callback(path, kwargs, log)
assert success is expect_success
if error_contains:
assert error_contains in error
else:
assert error is None