| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import datetime |
| |
| |
| def max_partition( |
| table, schema="default", field=None, filter_map=None, metastore_conn_id='metastore_default' |
| ): |
| """ |
| Gets the max partition for a table. |
| |
| :param schema: The hive schema the table lives in |
| :param table: The hive table you are interested in, supports the dot |
| notation as in "my_database.my_table", if a dot is found, |
| the schema param is disregarded |
| :param metastore_conn_id: The hive connection you are interested in. |
| If your default is set you don't need to use this parameter. |
| :param filter_map: partition_key:partition_value map used for partition filtering, |
| e.g. {'key1': 'value1', 'key2': 'value2'}. |
| Only partitions matching all partition_key:partition_value |
| pairs will be considered as candidates of max partition. |
| :param field: the field to get the max value from. If there's only |
| one partition field, this will be inferred |
| |
| >>> max_partition('airflow.static_babynames_partitioned') |
| '2015-01-01' |
| """ |
| from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook |
| |
| if '.' in table: |
| schema, table = table.split('.') |
| hive_hook = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) |
| return hive_hook.max_partition(schema=schema, table_name=table, field=field, filter_map=filter_map) |
| |
| |
| def _closest_date(target_dt, date_list, before_target=None): |
| """ |
| This function finds the date in a list closest to the target date. |
| An optional parameter can be given to get the closest before or after. |
| |
| :param target_dt: The target date |
| :param date_list: The list of dates to search |
| :param before_target: closest before or after the target |
| :returns: The closest date |
| :rtype: datetime.date or None |
| """ |
| time_before = lambda d: target_dt - d if d <= target_dt else datetime.timedelta.max |
| time_after = lambda d: d - target_dt if d >= target_dt else datetime.timedelta.max |
| any_time = lambda d: target_dt - d if d < target_dt else d - target_dt |
| if before_target is None: |
| return min(date_list, key=any_time).date() |
| if before_target: |
| return min(date_list, key=time_before).date() |
| else: |
| return min(date_list, key=time_after).date() |
| |
| |
| def closest_ds_partition(table, ds, before=True, schema="default", metastore_conn_id='metastore_default'): |
| """ |
| This function finds the date in a list closest to the target date. |
| An optional parameter can be given to get the closest before or after. |
| |
| :param table: A hive table name |
| :param ds: A datestamp ``%Y-%m-%d`` e.g. ``yyyy-mm-dd`` |
| :param before: closest before (True), after (False) or either side of ds |
| :param schema: table schema |
| :param metastore_conn_id: which metastore connection to use |
| :returns: The closest date |
| :rtype: str or None |
| |
| >>> tbl = 'airflow.static_babynames_partitioned' |
| >>> closest_ds_partition(tbl, '2015-01-02') |
| '2015-01-01' |
| """ |
| from airflow.providers.apache.hive.hooks.hive import HiveMetastoreHook |
| |
| if '.' in table: |
| schema, table = table.split('.') |
| hive_hook = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) |
| partitions = hive_hook.get_partitions(schema=schema, table_name=table) |
| if not partitions: |
| return None |
| part_vals = [list(p.values())[0] for p in partitions] |
| if ds in part_vals: |
| return ds |
| else: |
| parts = [datetime.datetime.strptime(pv, '%Y-%m-%d') for pv in part_vals] |
| target_dt = datetime.datetime.strptime(ds, '%Y-%m-%d') |
| closest_ds = _closest_date(target_dt, parts, before_target=before) |
| return closest_ds.isoformat() |