blob: b19cc4baa873bf59b8754ea775e94760cd729739 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import tempfile
from unittest import mock
import pytest
from airflow.configuration import conf
from airflow.models import SlaMiss
from airflow.operators.empty import EmptyOperator
from airflow.providers.smtp.hooks.smtp import SmtpHook
from airflow.providers.smtp.notifications.smtp import (
SmtpNotifier,
send_smtp_notification,
)
from airflow.utils import timezone
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name
class TestSmtpNotifier:
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier(self, mock_smtphook_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
notifier = send_smtp_notification(
from_email="test_sender@test.com",
to="test_reciver@test.com",
subject="subject",
html_content="body",
)
notifier({"dag": dag})
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email="test_sender@test.com",
to="test_reciver@test.com",
subject="subject",
html_content="body",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_notifier_class(self, mock_smtphook_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
notifier = SmtpNotifier(
from_email="test_sender@test.com",
to="test_reciver@test.com",
subject="subject",
html_content="body",
)
notifier({"dag": dag})
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email="test_sender@test.com",
to="test_reciver@test.com",
subject="subject",
html_content="body",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_templated(self, mock_smtphook_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
notifier = SmtpNotifier(
from_email="test_sender@test.com {{dag.dag_id}}",
to="test_reciver@test.com {{dag.dag_id}}",
subject="subject {{dag.dag_id}}",
html_content="body {{dag.dag_id}}",
)
context = {"dag": dag}
notifier(context)
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email="test_sender@test.com test_notifier",
to="test_reciver@test.com test_notifier",
subject="subject test_notifier",
html_content="body test_notifier",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance):
ti = create_task_instance(dag_id="dag", task_id="op", execution_date=timezone.datetime(2018, 1, 1))
context = {"dag": ti.dag_run.dag, "ti": ti}
notifier = SmtpNotifier(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
)
notifier(context)
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="DAG dag - Task op - Run ID test in State None",
html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Run ID:</td>\n <td>test</td>\n </tr>\n <tr>\n <td>Try:</td>\n <td>0 of 1</td>\n </tr>\n <tr>\n <td>Task State:</td>\n <td>None</td>\n </tr>\n <tr>\n <td>Host:</td>\n <td></td>\n </tr>\n <tr>\n <td>Log Link:</td>\n <td><a href="http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs" style="text-decoration:underline;">http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs</a></td>\n </tr>\n <tr>\n <td>Mark Success Link:</td>\n <td><a href="http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success" style="text-decoration:underline;">http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success</a></td>\n </tr>\n \n </table>\n</body>\n</html>""",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_defaults_sla(self, mock_smtphook_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
context = {
"dag": dag,
"slas": [SlaMiss(task_id="op", dag_id=dag.dag_id, execution_date=timezone.datetime(2018, 1, 1))],
"task_list": [],
"blocking_task_list": [],
"blocking_tis": [],
}
notifier = SmtpNotifier(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
)
notifier(context)
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="SLA Missed for DAG test_notifier - Task op",
html_content="""<!DOCTYPE html>\n<html>\n <head>\n <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\n <meta name="viewport" content="width=device-width">\n </head>\n<body>\n <table role="presentation">\n \n <tr>\n <td>Dag:</td>\n <td>test_notifier</td>\n </tr>\n <tr>\n <td>Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>Blocking Task List:</td>\n <td>[]</td>\n </tr>\n <tr>\n <td>SLAs:</td>\n <td>[(\'test_notifier\', \'op\', \'2018-01-01T00:00:00+00:00\')]</td>\n </tr>\n <tr>\n <td>Blocking TI\'s</td>\n <td>[]</td>\n </tr>\n \n </table>\n</body>\n</html>""",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)
@mock.patch("airflow.providers.smtp.notifications.smtp.SmtpHook")
def test_notifier_with_nondefault_conf_vars(self, mock_smtphook_hook, create_task_instance):
ti = create_task_instance(dag_id="dag", task_id="op", execution_date=timezone.datetime(2018, 1, 1))
context = {"dag": ti.dag_run.dag, "ti": ti}
with tempfile.NamedTemporaryFile(mode="wt", suffix=".txt") as f_subject, tempfile.NamedTemporaryFile(
mode="wt", suffix=".txt"
) as f_content:
f_subject.write("Task {{ ti.task_id }} failed")
f_subject.flush()
f_content.write("Mock content goes here")
f_content.flush()
with conf_vars(
{
("smtp", "templated_html_content_path"): f_content.name,
("smtp", "templated_email_subject_path"): f_subject.name,
}
):
notifier = SmtpNotifier(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
)
notifier(context)
mock_smtphook_hook.return_value.__enter__().send_email_smtp.assert_called_once_with(
from_email=conf.get("smtp", "smtp_mail_from"),
to="test_reciver@test.com",
subject="Task op failed",
html_content="Mock content goes here",
smtp_conn_id="smtp_default",
files=None,
cc=None,
bcc=None,
mime_subtype="mixed",
mime_charset="utf-8",
custom_headers=None,
)