blob: 390eb0edb7ccc24e82355442e09fa3a16fdaa1a3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import secrets
import string
import pendulum
from slugify import slugify
from airflow.models.taskinstancekey import TaskInstanceKey
log = logging.getLogger(__name__)
alphanum_lower = string.ascii_lowercase + string.digits
def rand_str(num):
"""Generate random lowercase alphanumeric string of length num.
:meta private:
"""
return "".join(secrets.choice(alphanum_lower) for _ in range(num))
def add_pod_suffix(*, pod_name, rand_len=8, max_len=80):
"""Add random string to pod name while staying under max len."""
suffix = "-" + rand_str(rand_len)
return pod_name[: max_len - len(suffix)].strip("-.") + suffix
def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
*,
max_length: int = 80,
unique: bool = True,
) -> str:
"""
Generates unique pod ID given a dag_id and / or task_id.
The default of 80 for max length is somewhat arbitrary, mainly a balance between
content and not overwhelming terminal windows of reasonable width. The true
upper limit is 253, and this is enforced in construct_pod.
:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
"""
if not (dag_id or task_id):
raise ValueError("Must supply either dag_id or task_id.")
name = ""
if dag_id:
name += dag_id
if task_id:
if name:
name += "-"
name += task_id
base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
if unique:
return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length)
else:
return base_name
def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
"""Build a TaskInstanceKey based on pod annotations."""
log.debug("Creating task key for annotations %s", annotations)
dag_id = annotations["dag_id"]
task_id = annotations["task_id"]
try_number = int(annotations["try_number"])
annotation_run_id = annotations.get("run_id")
map_index = int(annotations.get("map_index", -1))
if not annotation_run_id and "execution_date" in annotations:
# Compat: Look up the run_id from the TI table!
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.settings import Session
execution_date = pendulum.parse(annotations["execution_date"])
# Do _not_ use create-session, we don't want to expunge
session = Session()
task_instance_run_id = (
session.query(TaskInstance.run_id)
.join(TaskInstance.dag_run)
.filter(
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
DagRun.execution_date == execution_date,
)
.scalar()
)
else:
task_instance_run_id = annotation_run_id
return TaskInstanceKey(
dag_id=dag_id,
task_id=task_id,
run_id=task_instance_run_id,
try_number=try_number,
map_index=map_index,
)