blob: a4f70c8dfec05fbee2985de84818002a06547767 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import warnings
from datetime import datetime, timedelta
from typing import Collection
from croniter import croniter
from dateutil.relativedelta import relativedelta # for doctest
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.typing_compat import Literal
from airflow.utils import timezone
cron_presets: dict[str, str] = {
"@hourly": "0 * * * *",
"@daily": "0 0 * * *",
"@weekly": "0 0 * * 0",
"@monthly": "0 0 1 * *",
"@quarterly": "0 0 1 */3 *",
"@yearly": "0 0 1 1 *",
}
def date_range(
start_date: datetime,
end_date: datetime | None = None,
num: int | None = None,
delta: str | timedelta | relativedelta | None = None,
) -> list[datetime]:
"""Get a list of dates in the specified range, separated by delta.
.. code-block:: pycon
>>> from airflow.utils.dates import date_range
>>> from datetime import datetime, timedelta
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *")
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))]
:param start_date: anchor date to start the series from
:param end_date: right boundary for the date range
:param num: alternatively to end_date, you can specify the number of
number of entries you want in the range. This number can be negative,
output will always be sorted regardless
:param delta: step length. It can be datetime.timedelta or cron expression as string
"""
warnings.warn(
"`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if not delta:
return []
if end_date:
if start_date > end_date:
raise Exception("Wait. start_date needs to be before end_date")
if num:
raise Exception("Wait. Either specify end_date OR num")
if not end_date and not num:
end_date = timezone.utcnow()
delta_iscron = False
time_zone = start_date.tzinfo
abs_delta: timedelta | relativedelta
if isinstance(delta, str):
delta_iscron = True
if timezone.is_localized(start_date):
start_date = timezone.make_naive(start_date, time_zone)
cron = croniter(cron_presets.get(delta, delta), start_date)
elif isinstance(delta, timedelta):
abs_delta = abs(delta)
elif isinstance(delta, relativedelta):
abs_delta = abs(delta)
else:
raise Exception("Wait. delta must be either datetime.timedelta or cron expression as str")
dates = []
if end_date:
if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
end_date = timezone.make_naive(end_date, time_zone)
while start_date <= end_date: # type: ignore
if timezone.is_naive(start_date):
dates.append(timezone.make_aware(start_date, time_zone))
else:
dates.append(start_date)
if delta_iscron:
start_date = cron.get_next(datetime)
else:
start_date += abs_delta
else:
num_entries: int = num # type: ignore
for _ in range(abs(num_entries)):
if timezone.is_naive(start_date):
dates.append(timezone.make_aware(start_date, time_zone))
else:
dates.append(start_date)
if delta_iscron and num_entries > 0:
start_date = cron.get_next(datetime)
elif delta_iscron:
start_date = cron.get_prev(datetime)
elif num_entries > 0:
start_date += abs_delta
else:
start_date -= abs_delta
return sorted(dates)
def round_time(
dt: datetime,
delta: str | timedelta | relativedelta,
start_date: datetime = timezone.make_aware(datetime.min),
):
"""Returns ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``.
.. code-block:: pycon
>>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
datetime.datetime(2015, 1, 1, 0, 0)
>>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
datetime.datetime(2015, 1, 1, 0, 0)
>>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 16, 0, 0)
>>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 15, 0, 0)
>>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
>>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
"""
if isinstance(delta, str):
# It's cron based, so it's easy
time_zone = start_date.tzinfo
start_date = timezone.make_naive(start_date, time_zone)
cron = croniter(delta, start_date)
prev = cron.get_prev(datetime)
if prev == start_date:
return timezone.make_aware(start_date, time_zone)
else:
return timezone.make_aware(prev, time_zone)
# Ignore the microseconds of dt
dt -= timedelta(microseconds=dt.microsecond)
# We are looking for a datetime in the form start_date + i * delta
# which is as close as possible to dt. Since delta could be a relative
# delta we don't know its exact length in seconds so we cannot rely on
# division to find i. Instead we employ a binary search algorithm, first
# finding an upper and lower limit and then dissecting the interval until
# we have found the closest match.
# We first search an upper limit for i for which start_date + upper * delta
# exceeds dt.
upper = 1
while start_date + upper * delta < dt:
# To speed up finding an upper limit we grow this exponentially by a
# factor of 2
upper *= 2
# Since upper is the first value for which start_date + upper * delta
# exceeds dt, upper // 2 is below dt and therefore forms a lower limited
# for the i we are looking for
lower = upper // 2
# We now continue to intersect the interval between
# start_date + lower * delta and start_date + upper * delta
# until we find the closest value
while True:
# Invariant: start + lower * delta < dt <= start + upper * delta
# If start_date + (lower + 1)*delta exceeds dt, then either lower or
# lower+1 has to be the solution we are searching for
if start_date + (lower + 1) * delta >= dt:
# Check if start_date + (lower + 1)*delta or
# start_date + lower*delta is closer to dt and return the solution
if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta):
return start_date + (lower + 1) * delta
else:
return start_date + lower * delta
# We intersect the interval and either replace the lower or upper
# limit with the candidate
candidate = lower + (upper - lower) // 2
if start_date + candidate * delta >= dt:
upper = candidate
else:
lower = candidate
# in the special case when start_date > dt the search for upper will
# immediately stop for upper == 1 which results in lower = upper // 2 = 0
# and this function returns start_date.
TimeUnit = Literal["days", "hours", "minutes", "seconds"]
def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit:
"""Determine the most appropriate time unit for given durations (in seconds).
e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
"""
if len(time_seconds_arr) == 0:
return "hours"
max_time_seconds = max(time_seconds_arr)
if max_time_seconds <= 60 * 2:
return "seconds"
elif max_time_seconds <= 60 * 60 * 2:
return "minutes"
elif max_time_seconds <= 24 * 60 * 60 * 2:
return "hours"
else:
return "days"
def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Collection[float]:
"""Convert an array of time durations in seconds to the specified time unit."""
if unit == "minutes":
return list(map(lambda x: x / 60, time_seconds_arr))
elif unit == "hours":
return list(map(lambda x: x / (60 * 60), time_seconds_arr))
elif unit == "days":
return list(map(lambda x: x / (24 * 60 * 60), time_seconds_arr))
return time_seconds_arr
def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
"""Get a datetime object representing *n* days ago.
By default the time is set to midnight.
"""
warnings.warn(
"Function `days_ago` is deprecated and will be removed in Airflow 3.0. "
"You can achieve equivalent behavior with `pendulum.today('UTC').add(days=-N, ...)`",
RemovedInAirflow3Warning,
stacklevel=2,
)
today = timezone.utcnow().replace(hour=hour, minute=minute, second=second, microsecond=microsecond)
return today - timedelta(days=n)
def parse_execution_date(execution_date_str):
"""Parse execution date string to datetime object."""
return timezone.parse(execution_date_str)