| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| """Tests for the callback supervisor module.""" |
| |
| from __future__ import annotations |
| |
| import pytest |
| import structlog |
| |
| from airflow.sdk.execution_time.callback_supervisor import execute_callback |
| |
| |
| def callback_no_args(): |
| """A simple callback that takes no arguments.""" |
| return "ok" |
| |
| |
| def callback_with_kwargs(arg1, arg2): |
| """A callback that accepts keyword arguments.""" |
| return f"{arg1}-{arg2}" |
| |
| |
| def callback_that_raises(): |
| """A callback that always raises.""" |
| raise ValueError("something went wrong") |
| |
| |
| class CallableClass: |
| """A class that returns a callable instance (like BaseNotifier).""" |
| |
| def __init__(self, **kwargs): |
| self.kwargs = kwargs |
| |
| def __call__(self, context): |
| return "notified" |
| |
| |
| class TestExecuteCallback: |
| @pytest.mark.parametrize( |
| ("path", "kwargs", "expect_success", "error_contains"), |
| [ |
| pytest.param( |
| f"{__name__}.callback_no_args", |
| {}, |
| True, |
| None, |
| id="successful_no_args", |
| ), |
| pytest.param( |
| f"{__name__}.callback_with_kwargs", |
| {"arg1": "hello", "arg2": "world"}, |
| True, |
| None, |
| id="successful_with_kwargs", |
| ), |
| pytest.param( |
| f"{__name__}.CallableClass", |
| {"msg": "alert"}, |
| True, |
| None, |
| id="callable_class_pattern", |
| ), |
| pytest.param( |
| "", |
| {}, |
| False, |
| "Callback path not found", |
| id="empty_path", |
| ), |
| pytest.param( |
| "nonexistent.module.function", |
| {}, |
| False, |
| "ModuleNotFoundError", |
| id="import_error", |
| ), |
| pytest.param( |
| f"{__name__}.callback_that_raises", |
| {}, |
| False, |
| "ValueError", |
| id="execution_error", |
| ), |
| pytest.param( |
| f"{__name__}.nonexistent_function_xyz", |
| {}, |
| False, |
| "AttributeError", |
| id="attribute_error", |
| ), |
| ], |
| ) |
| def test_execute_callback(self, path, kwargs, expect_success, error_contains): |
| log = structlog.get_logger() |
| success, error = execute_callback(path, kwargs, log) |
| |
| assert success is expect_success |
| if error_contains: |
| assert error_contains in error |
| else: |
| assert error is None |