blob: 1faf1ceea8a5aaa33552570f64a180e1a0088d91 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
import pytest
from parameterized import parameterized
from airflow.exceptions import AirflowSensorTimeout
from airflow.models import DagBag
from airflow.models.dag import DAG
from airflow.sensors.weekday import DayOfWeekSensor
from airflow.utils.timezone import datetime
from airflow.utils.weekday import WeekDay
from tests.test_utils import db
DEFAULT_DATE = datetime(2018, 12, 10)
WEEKDAY_DATE = datetime(2018, 12, 20)
WEEKEND_DATE = datetime(2018, 12, 22)
TEST_DAG_ID = 'weekday_sensor_dag'
DEV_NULL = '/dev/null'
class TestDayOfWeekSensor(unittest.TestCase):
@staticmethod
def clean_db():
db.clear_db_runs()
db.clear_db_task_fail()
def setUp(self):
self.clean_db()
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, default_args=self.args)
self.dag = dag
def tearDown(self):
self.clean_db()
@parameterized.expand(
[
("with-string", 'Thursday'),
("with-enum", WeekDay.THURSDAY),
("with-enum-set", {WeekDay.THURSDAY}),
("with-enum-set-2-items", {WeekDay.THURSDAY, WeekDay.FRIDAY}),
("with-string-set", {'Thursday'}),
("with-string-set-2-items", {'Thursday', 'Friday'}),
]
)
def test_weekday_sensor_true(self, _, week_day):
op = DayOfWeekSensor(
task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag
)
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
assert op.week_day == week_day
def test_weekday_sensor_false(self):
op = DayOfWeekSensor(
task_id='weekday_sensor_check_false',
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
def test_invalid_weekday_number(self):
invalid_week_day = 'Thsday'
with pytest.raises(AttributeError, match=f'Invalid Week Day passed: "{invalid_week_day}"'):
DayOfWeekSensor(
task_id='weekday_sensor_invalid_weekday_num',
week_day=invalid_week_day,
use_task_execution_day=True,
dag=self.dag,
)
def test_weekday_sensor_with_invalid_type(self):
invalid_week_day = ['Thsday']
with pytest.raises(
TypeError,
match='Unsupported Type for week_day parameter:'
' {}. It should be one of str, set or '
'Weekday enum type'.format(type(invalid_week_day)),
):
DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=invalid_week_day,
use_task_execution_day=True,
dag=self.dag,
)
def test_weekday_sensor_timeout_with_set(self):
op = DayOfWeekSensor(
task_id='weekday_sensor_check_false',
poke_interval=1,
timeout=2,
week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
use_task_execution_day=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)