blob: 65bda6ade8ade0879ea4841ef10be62922a7c8de [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import MagicMock
import jinja2
import pytest
from airflow.notifications.basenotifier import BaseNotifier
from airflow.operators.empty import EmptyOperator
pytestmark = pytest.mark.db_test
if TYPE_CHECKING:
from airflow.utils.context import Context
class MockNotifier(BaseNotifier):
"""MockNotifier class for testing"""
template_fields = ("message",)
template_ext = (".txt",)
def __init__(self, message: str | None = "This is a test message"):
super().__init__()
self.message = message
def notify(self, context: Context) -> None:
pass
class TestBaseNotifier:
def test_render_message_with_message(self, dag_maker):
with dag_maker("test_render_message_with_message") as dag:
EmptyOperator(task_id="test_id")
notifier = MockNotifier(message="Hello {{ dag.dag_id }}")
context: Context = {"dag": dag}
notifier.render_template_fields(context)
assert notifier.message == "Hello test_render_message_with_message"
def test_render_message_with_template(self, dag_maker, caplog):
with dag_maker("test_render_message_with_template") as dag:
EmptyOperator(task_id="test_id")
notifier = MockNotifier(message="test.txt")
context: Context = {"dag": dag}
with pytest.raises(jinja2.exceptions.TemplateNotFound):
notifier.render_template_fields(context)
def test_render_message_with_template_works(self, dag_maker, caplog):
with dag_maker("test_render_message_with_template_works") as dag:
EmptyOperator(task_id="test_id")
notifier = MockNotifier(message="test_notifier.txt")
context: Context = {"dag": dag}
notifier.render_template_fields(context)
assert notifier.message == "Hello test_render_message_with_template_works"
def test_notifier_call_with_passed_context(self, dag_maker, caplog):
with dag_maker("test_render_message_with_template_works") as dag:
EmptyOperator(task_id="test_id")
notifier = MockNotifier(message="Hello {{ dag.dag_id }}")
notifier.notify = MagicMock()
context: Context = {"dag": dag}
notifier(context)
notifier.notify.assert_called_once_with({"dag": dag, "message": "Hello {{ dag.dag_id }}"})
assert notifier.message == "Hello test_render_message_with_template_works"
def test_notifier_call_with_prepared_context(self, dag_maker, caplog):
with dag_maker("test_render_message_with_template_works"):
EmptyOperator(task_id="test_id")
notifier = MockNotifier(message="task: {{ task_list[0] }}")
notifier.notify = MagicMock()
notifier(None, ["some_task"], None, None, None)
notifier.notify.assert_called_once_with(
{
"dag": None,
"task_list": ["some_task"],
"blocking_task_list": None,
"slas": None,
"blocking_tis": None,
"message": "task: {{ task_list[0] }}",
}
)
assert notifier.message == "task: some_task"