blob: 1d58717287c7337c8deafe37f3c14a2b8715254c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import inspect
import logging
import socket
from typing import TYPE_CHECKING, Any, Callable
from airflow.configuration import conf
from airflow.typing_compat import Protocol
log = logging.getLogger(__name__)
def gen_context(trace_id, span_id):
"""Generate span context from trace_id and span_id."""
from airflow.traces.otel_tracer import gen_context as otel_gen_context
return otel_gen_context(trace_id, span_id)
def gen_links_from_kv_list(list):
"""Generate links from kv list of {trace_id:int, span_id:int}."""
from airflow.traces.otel_tracer import gen_links_from_kv_list
return gen_links_from_kv_list(list)
def span(func):
"""Decorate a function with span."""
def wrapper(*args, **kwargs):
func_name = func.__name__
qual_name = func.__qualname__
module_name = func.__module__
if "." in qual_name:
component = f"{qual_name.rsplit('.', 1)[0]}"
else:
component = module_name
with Trace.start_span(span_name=func_name, component=component):
if len(inspect.signature(func).parameters) > 0:
return func(*args, **kwargs)
else:
return func()
return wrapper
class EmptyContext:
"""If no Tracer is configured, EmptyContext is used as a fallback."""
def __init__(self):
self.trace_id = 1
class EmptySpan:
"""If no Tracer is configured, EmptySpan is used as a fallback."""
def __enter__(self):
"""Enter."""
return self
def __exit__(self, *args, **kwargs):
"""Exit."""
pass
def __call__(self, obj):
"""Call."""
return obj
def get_span_context(self):
"""Get span context."""
return EMPTY_CTX
def set_attribute(self, key, value) -> None:
"""Set an attribute to the span."""
pass
def set_attributes(self, attributes) -> None:
"""Set multiple attributes at once."""
pass
def is_recording(self):
return False
def add_event(
self,
name: str,
attributes: Any | None = None,
timestamp: int | None = None,
) -> None:
"""Add event to span."""
pass
def add_link(
self,
context: Any,
attributes: Any | None = None,
) -> None:
"""Add link to the span."""
pass
def end(self, end_time=None, *args, **kwargs) -> None:
"""End."""
pass
EMPTY_SPAN = EmptySpan()
EMPTY_CTX = EmptyContext()
class Tracer(Protocol):
"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""
instance: Tracer | EmptyTrace | None = None
@classmethod
def get_tracer(cls, component):
"""Get a tracer."""
raise NotImplementedError()
@classmethod
def start_span(
cls,
span_name: str,
component: str | None = None,
parent_sc=None,
span_id=None,
links=None,
start_time=None,
):
"""Start a span."""
raise NotImplementedError()
@classmethod
def use_span(cls, span):
"""Use a span as current."""
raise NotImplementedError()
@classmethod
def get_current_span(self):
raise NotImplementedError()
@classmethod
def start_span_from_dagrun(
cls,
dagrun,
span_name=None,
service_name=None,
component=None,
links=None,
):
"""Start a span from dagrun."""
raise NotImplementedError()
@classmethod
def start_span_from_taskinstance(
cls,
ti,
span_name=None,
component=None,
child=False,
links=None,
):
"""Start a span from taskinstance."""
raise NotImplementedError()
class EmptyTrace:
"""If no Tracer is configured, EmptyTracer is used as a fallback."""
@classmethod
def get_tracer(
cls,
component: str,
trace_id: int | None = None,
span_id: int | None = None,
):
"""Get a tracer using provided node id and trace id."""
return cls
@classmethod
def start_span(
cls,
span_name: str,
component: str | None = None,
parent_sc=None,
span_id=None,
links=None,
start_time=None,
) -> EmptySpan:
"""Start a span."""
return EMPTY_SPAN
@classmethod
def use_span(cls, span) -> EmptySpan:
"""Use a span as current."""
return EMPTY_SPAN
@classmethod
def get_current_span(self) -> EmptySpan:
"""Get the current span."""
return EMPTY_SPAN
@classmethod
def start_span_from_dagrun(
cls,
dagrun,
span_name=None,
service_name=None,
component=None,
links=None,
) -> EmptySpan:
"""Start a span from dagrun."""
return EMPTY_SPAN
@classmethod
def start_span_from_taskinstance(
cls,
ti,
span_name=None,
component=None,
child=False,
links=None,
) -> EmptySpan:
"""Start a span from taskinstance."""
return EMPTY_SPAN
class _Trace(type):
factory: Callable
instance: Tracer | EmptyTrace | None = None
def __getattr__(cls, name: str) -> str:
if not cls.instance:
try:
cls.instance = cls.factory()
except (socket.gaierror, ImportError) as e:
log.error("Could not configure Trace: %s, using EmptyTrace instead.", e)
cls.instance = EmptyTrace()
return getattr(cls.instance, name)
def __init__(cls, *args, **kwargs) -> None:
super().__init__(cls)
if not hasattr(cls.__class__, "factory"):
if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"):
from airflow.traces import otel_tracer
cls.__class__.factory = otel_tracer.get_otel_tracer
else:
cls.__class__.factory = EmptyTrace
@classmethod
def get_constant_tags(cls) -> str | None:
"""Get constant tags to add to all traces."""
tags_in_string = conf.get("traces", "tags", fallback=None)
if not tags_in_string:
return None
return tags_in_string
if TYPE_CHECKING:
Trace: EmptyTrace
else:
class Trace(metaclass=_Trace):
"""Empty class for Trace - we use metaclass to inject the right one."""