blob: 73a4be5ea48a217c420f60c02051502210ba22a7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import pendulum
import pytest
import time_machine
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.timetables.simple import ContinuousTimetable
BEFORE_DATE = pendulum.datetime(2023, 3, 1, tz="UTC")
START_DATE = pendulum.datetime(2023, 3, 3, tz="UTC")
DURING_DATE = pendulum.datetime(2023, 3, 6, tz="UTC")
END_DATE = pendulum.datetime(2023, 3, 10, tz="UTC")
AFTER_DATE = pendulum.datetime(2023, 3, 12, tz="UTC")
@pytest.fixture
def restriction():
return TimeRestriction(earliest=START_DATE, latest=END_DATE, catchup=True)
@pytest.fixture
def timetable():
return ContinuousTimetable()
def test_no_runs_without_start_date(timetable):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=TimeRestriction(earliest=None, latest=None, catchup=False),
)
assert next_info is None
@time_machine.travel(DURING_DATE)
def test_first_run_after_start_date_correct_interval(timetable, restriction):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=restriction,
)
assert next_info.run_after == DURING_DATE
assert next_info.data_interval.start == START_DATE
assert next_info.data_interval.end == DURING_DATE
@time_machine.travel(BEFORE_DATE)
def test_first_run_before_start_date_correct_interval(timetable, restriction):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=None,
restriction=restriction,
)
assert next_info.run_after == START_DATE
assert next_info.data_interval.start == START_DATE
assert next_info.data_interval.end == START_DATE
@time_machine.travel(DURING_DATE)
def test_run_uses_utcnow(timetable, restriction):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=DataInterval(START_DATE, DURING_DATE),
restriction=restriction,
)
assert next_info.run_after == DURING_DATE
@time_machine.travel(AFTER_DATE)
def test_no_runs_after_end_date(timetable, restriction):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=DataInterval(START_DATE, DURING_DATE),
restriction=restriction,
)
assert next_info is None
@time_machine.travel(BEFORE_DATE)
def test_no_false_triggering_with_future_start_date_after_run(timetable, restriction):
next_info = timetable.next_dagrun_info(
last_automated_data_interval=DataInterval(BEFORE_DATE, BEFORE_DATE.add(hours=1)),
restriction=restriction,
)
assert next_info is not None
assert next_info.data_interval.start == START_DATE