blob: a4fe4ec384c5c1e897a0b497daaaf48ffcdcacd2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import codecs
import io
import itertools
import logging
import os
import re
import sys
from collections.abc import Callable, Iterable, Mapping, Sequence
from functools import cache, cached_property, partial
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast
import pygtrie
import structlog
import structlog.processors
from structlog.processors import NAME_TO_LEVEL, CallsiteParameter
from ._noncaching import make_file_io_non_caching
from .percent_formatter import PercentFormatRender
if TYPE_CHECKING:
from structlog.typing import (
BindableLogger,
EventDict,
Processor,
WrappedLogger,
)
from .types import Logger
log = logging.getLogger(__name__)
__all__ = [
"configure_logging",
"structlog_processors",
]
JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}
def _make_airflow_structlogger(min_level):
# This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
# as inspiration
LEVEL_TO_NAME = {v: k for k, v in NAME_TO_LEVEL.items()}
# A few things, namely paramiko _really_ wants this to be a stdlib logger. These fns pretends it is enough
# like it to function.
@cached_property
def handlers(self):
return [logging.NullHandler()]
@property
def level(self):
return min_level
@property
def name(self):
return self._logger.name
def _nop(self: Any, event: str, *args: Any, **kw: Any) -> Any:
return None
# Work around an issue in structlog https://github.com/hynek/structlog/issues/745
def make_method(
level: int,
) -> Callable[..., Any]:
name = LEVEL_TO_NAME[level]
if level < min_level:
return _nop
def meth(self: Any, event: str, *args: Any, **kw: Any) -> Any:
if not args:
return self._proxy_to_logger(name, event, **kw)
# See https://github.com/python/cpython/blob/3.13/Lib/logging/__init__.py#L307-L326 for reason
if args and len(args) == 1 and isinstance(args[0], Mapping) and args[0]:
return self._proxy_to_logger(name, event % args[0], **kw)
return self._proxy_to_logger(name, event % args, **kw)
meth.__name__ = name
return meth
base = structlog.make_filtering_bound_logger(min_level)
cls = type(
f"AirflowBoundLoggerFilteringAt{LEVEL_TO_NAME.get(min_level, 'Notset').capitalize()}",
(base,),
{
"isEnabledFor": base.is_enabled_for,
"getEffectiveLevel": base.get_effective_level,
"level": level,
"name": name,
"handlers": handlers,
}
| {name: make_method(lvl) for lvl, name in LEVEL_TO_NAME.items()},
)
LEVEL_TO_FILTERING_LOGGER[min_level] = cls
return cls
AirflowBoundLoggerFilteringAtNotset = _make_airflow_structlogger(NAME_TO_LEVEL["notset"])
AirflowBoundLoggerFilteringAtDebug = _make_airflow_structlogger(NAME_TO_LEVEL["debug"])
AirflowBoundLoggerFilteringAtInfo = _make_airflow_structlogger(NAME_TO_LEVEL["info"])
AirflowBoundLoggerFilteringAtWarning = _make_airflow_structlogger(NAME_TO_LEVEL["warning"])
AirflowBoundLoggerFilteringAtError = _make_airflow_structlogger(NAME_TO_LEVEL["error"])
AirflowBoundLoggerFilteringAtCritical = _make_airflow_structlogger(NAME_TO_LEVEL["critical"])
# We use a trie structure (sometimes also called a "prefix tree") so that we can easily and quickly find the
# most suitable log level to apply. This mirrors the logging level cascade behavior from stdlib logging,
# without the complexity of multiple handlers etc
PER_LOGGER_LEVELS = pygtrie.StringTrie(separator=".")
PER_LOGGER_LEVELS.update(
{
# Top level logging default - changed to respect config in `configure_logging`
"": NAME_TO_LEVEL["info"],
}
)
def make_filtering_logger() -> Callable[..., BindableLogger]:
def maker(logger: WrappedLogger, *args, **kwargs):
# If the logger is a NamedBytesLogger/NamedWriteLogger (an Airflow specific subclass) then
# look up the global per-logger config and redirect to a new class.
logger_name = kwargs.get("context", {}).get("logger_name")
if not logger_name and isinstance(logger, (NamedWriteLogger, NamedBytesLogger)):
logger_name = logger.name
if (level_override := kwargs.get("context", {}).pop("__level_override", None)) is not None:
level = level_override
elif logger_name:
level = PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""])
else:
level = PER_LOGGER_LEVELS[""]
return LEVEL_TO_FILTERING_LOGGER[level](logger, *args, **kwargs) # type: ignore[call-arg]
return maker
class NamedBytesLogger(structlog.BytesLogger):
__slots__ = ("name",)
def __init__(self, name: str | None = None, file: BinaryIO | None = None):
self.name = name
if file is not None:
file = make_file_io_non_caching(file)
super().__init__(file)
class NamedWriteLogger(structlog.WriteLogger):
__slots__ = ("name",)
def __init__(self, name: str | None = None, file: TextIO | None = None):
self.name = name
if file is not None:
file = make_file_io_non_caching(file)
super().__init__(file)
LogOutputType = TypeVar("LogOutputType", bound=TextIO | BinaryIO)
class LoggerFactory(Generic[LogOutputType]):
def __init__(
self,
cls: type[WrappedLogger],
io: LogOutputType | None = None,
):
self.cls = cls
self.io = io
def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger:
return self.cls(logger_name, self.io) # type: ignore[call-arg]
def logger_name(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
if logger_name := (event_dict.pop("logger_name", None) or getattr(logger, "name", None)):
event_dict.setdefault("logger", logger_name)
return event_dict
# `eyJ` is `{"` in base64 encoding -- and any value that starts like that is very likely a JWT
# token. Better safe than sorry
def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
for k, v in event_dict.items():
if isinstance(v, str):
event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
return event_dict
def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
event_dict.pop("positional_args", None)
return event_dict
# This is a placeholder fn, that is "edited" in place via the `suppress_logs_and_warning` decorator
# The reason we need to do it this way is that structlog caches loggers on first use, and those include the
# configured processors, so we can't get away with changing the config as it won't have any effect once the
# logger obj is created and has been used once
def respect_stdlib_disable(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
return event_dict
@cache
def structlog_processors(
json_output: bool,
log_format: str = "",
colors: bool = True,
callsite_parameters: tuple[CallsiteParameter, ...] = (),
):
"""
Create the correct list of structlog processors for the given config.
Return value is a tuple of three elements:
1. A list of processors shared for structlog and stdlib
2. The final processor/renderer (one that outputs a string) for use with structlog.stdlib.ProcessorFormatter
``callsite_parameters`` specifies the keys to add to the log event dict. If ``log_format`` is specified
then anything callsite related will be added to this list
:meta private:
"""
timestamper = structlog.processors.MaybeTimeStamper(fmt="iso")
# Processors shared between stdlib handlers and structlog processors
shared_processors: list[structlog.typing.Processor] = [
respect_stdlib_disable,
timestamper,
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
logger_name,
redact_jwt,
structlog.processors.StackInfoRenderer(),
]
if log_format:
# Maintain the order if any params that are given explicitly, then add on anything needed for the
# format string (so use a dict with None as the values as set doesn't preserve order)
params = {
param: None
for param in itertools.chain(
callsite_parameters or [], PercentFormatRender.callsite_params_from_fmt_string(log_format)
)
}
shared_processors.append(
structlog.processors.CallsiteParameterAdder(list(params.keys()), additional_ignores=[__name__])
)
elif callsite_parameters:
shared_processors.append(
structlog.processors.CallsiteParameterAdder(callsite_parameters, additional_ignores=[__name__])
)
# Imports to suppress showing code from these modules. We need the import to get the filepath for
# structlog to ignore.
import contextlib
import click
suppress: tuple[ModuleType, ...] = (click, contextlib)
try:
import httpcore
suppress = (*suppress, httpcore)
except ImportError:
pass
try:
import httpx
suppress = (*suppress, httpx)
except ImportError:
pass
if json_output:
dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
use_rich=False, show_locals=False, suppress=suppress
)
dict_tracebacks = structlog.processors.ExceptionRenderer(dict_exc_formatter)
import msgspec
def json_dumps(msg, default):
# Note: this is likely an "expensive" step, but lets massage the dict order for nice
# viewing of the raw JSON logs.
# Maybe we don't need this once the UI renders the JSON instead of displaying the raw text
msg = {
"timestamp": msg.pop("timestamp"),
"level": msg.pop("level"),
"event": msg.pop("event"),
**msg,
}
return msgspec.json.encode(msg, enc_hook=default)
json = structlog.processors.JSONRenderer(serializer=json_dumps)
def json_processor(logger: Any, method_name: Any, event_dict: EventDict) -> str:
result = json(logger, method_name, event_dict)
return result.decode("utf-8") if isinstance(result, bytes) else result
shared_processors.extend(
(
dict_tracebacks,
structlog.processors.UnicodeDecoder(),
),
)
return shared_processors, json_processor, json
exc_formatter: structlog.dev.RichTracebackFormatter | structlog.typing.ExceptionRenderer
if os.getenv("DEV", "") != "":
# Only use Rich in dev -- otherwise for "production" deployments it makes the logs harder to read as
# it uses lots of ANSI escapes and non ASCII characters. Simpler is better for non-dev non-JSON
exc_formatter = structlog.dev.RichTracebackFormatter(
# These values are picked somewhat arbitrarily to produce useful-but-compact tracebacks. If
# we ever need to change these then they should be configurable.
extra_lines=0,
max_frames=30,
indent_guides=False,
suppress=suppress,
)
else:
exc_formatter = structlog.dev.plain_traceback
my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles(colors=colors)
if colors:
my_styles["debug"] = structlog.dev.CYAN
console: PercentFormatRender | structlog.dev.ConsoleRenderer
if log_format:
console = PercentFormatRender(
fmt=log_format,
exception_formatter=exc_formatter,
level_styles=my_styles,
colors=colors,
)
else:
if callsite_parameters == (CallsiteParameter.FILENAME, CallsiteParameter.LINENO):
# Nicer formatting of the default callsite config
def log_loc(logger: Any, method_name: Any, event_dict: EventDict) -> EventDict:
if (
event_dict.get("logger") != "py.warnings"
and "filename" in event_dict
and "lineno" in event_dict
):
event_dict["loc"] = f"{event_dict.pop('filename')}:{event_dict.pop('lineno')}"
return event_dict
shared_processors.append(log_loc)
console = structlog.dev.ConsoleRenderer(
exception_formatter=exc_formatter,
level_styles=my_styles,
colors=colors,
)
return shared_processors, console, console
def configure_logging(
*,
json_output: bool = False,
log_level: str = "DEBUG",
log_format: str = "",
stdlib_config: dict | None = None,
extra_processors: Sequence[Processor] | None = None,
callsite_parameters: Iterable[CallsiteParameter] | None = None,
colors: bool = True,
output: LogOutputType | None = None,
namespace_log_levels: str | dict[str, str] | None = None,
cache_logger_on_first_use: bool = True,
):
"""
Configure structlog (and stbilb's logging to send via structlog processors too).
If percent_log_format is passed then it will be handled in a similar mode to stdlib, including
interpolations such as ``%(asctime)s`` etc.
:param json_output: Set to true to write all logs as JSON (one per line)
:param log_level: The default log level to use for most logs
:param log_format: A percent-style log format to write non JSON logs with.
:param output: Where to write the logs to. If ``json_output`` is true this must be a binary stream
:param colors: Whether to use colors for non-JSON logs. This only works if standard out is a TTY (that is,
an interactive session), unless overridden by environment variables described below.
Please note that disabling colors also disables all styling, including bold and italics.
The following environment variables control color behavior (set to any non-empty value to activate):
* ``NO_COLOR`` - Disables colors completely. This takes precedence over all other settings,
including ``FORCE_COLOR``.
* ``FORCE_COLOR`` - Forces colors to be enabled, even when output is not going to a TTY. This only
takes effect if ``NO_COLOR`` is not set.
:param callsite_parameters: A list parameters about the callsite (line number, function name etc) to
include in the logs.
If ``log_format`` is specified, then anything required to populate that (such as ``%(lineno)d``) will
be automatically included.
:param namespace_log_levels: Levels of extra loggers to configure.
To make this easier to use, this can be a string consisting of pairs of ``<logger>=<level>`` (either
string, or space delimited) which will set the level for that specific logger.
For example::
``sqlalchemy=INFO sqlalchemy.engine=DEBUG``
"""
if "fatal" not in NAME_TO_LEVEL:
NAME_TO_LEVEL["fatal"] = NAME_TO_LEVEL["critical"]
def is_atty():
return sys.stdout is not None and hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
colors = os.environ.get("NO_COLOR", "") == "" and (
os.environ.get("FORCE_COLOR", "") != "" or (colors and is_atty())
)
stdlib_config = stdlib_config or {}
extra_processors = extra_processors or ()
PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]
# Extract per-logger-tree levels and set them
if isinstance(namespace_log_levels, str):
log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2)
namespace_log_levels = {
log: level for log, level in map(log_from_level, re.split(r"[\s,]+", namespace_log_levels))
}
if namespace_log_levels:
for log, level in namespace_log_levels.items():
try:
loglevel = NAME_TO_LEVEL[level.lower()]
except KeyError:
raise ValueError(f"Invalid log level for logger {log!r}: {level!r}") from None
else:
PER_LOGGER_LEVELS[log] = loglevel
shared_pre_chain, for_stdlib, for_structlog = structlog_processors(
json_output,
log_format=log_format,
colors=colors,
callsite_parameters=tuple(callsite_parameters or ()),
)
shared_pre_chain += list(extra_processors)
pre_chain: list[structlog.typing.Processor] = [structlog.stdlib.add_logger_name] + shared_pre_chain
# Don't cache the loggers during tests, it makes it hard to capture them
if "PYTEST_VERSION" in os.environ:
cache_logger_on_first_use = False
std_lib_formatter: list[Processor] = [
# TODO: Don't include this if we are using PercentFormatter -- it'll delete something we
# just have to recreated!
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
drop_positional_args,
for_stdlib,
]
wrapper_class = cast("type[BindableLogger]", make_filtering_logger())
if json_output:
logger_factory: LoggerFactory[Any] = LoggerFactory(NamedBytesLogger, io=output)
else:
# There is no universal way of telling if a file-like-object is binary (and needs bytes) or text that
# works for files, sockets and io.StringIO/BytesIO.
# If given a binary object, wrap it in a text mode wrapper
text_output: TextIO | None = None
if output is not None and not hasattr(output, "encoding"):
text_output = io.TextIOWrapper(cast("BinaryIO", output), line_buffering=True)
elif output is not None:
text_output = cast("TextIO", output)
logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
structlog.configure(
processors=shared_pre_chain + [for_structlog],
cache_logger_on_first_use=cache_logger_on_first_use,
wrapper_class=wrapper_class,
logger_factory=logger_factory,
)
import logging.config
config = {**stdlib_config}
config.setdefault("version", 1)
config.setdefault("disable_existing_loggers", False)
config["formatters"] = {**config.get("formatters", {})}
config["handlers"] = {**config.get("handlers", {})}
config["loggers"] = {**config.get("loggers", {})}
config["formatters"].update(
{
"structlog": {
"()": structlog.stdlib.ProcessorFormatter,
"use_get_message": False,
"processors": std_lib_formatter,
"foreign_pre_chain": pre_chain,
"pass_foreign_args": True,
},
}
)
for section in (config["loggers"], config["handlers"]):
for log_config in section.values():
# We want everything to go via structlog, remove whatever the user might have configured
log_config.pop("stream", None)
log_config.pop("formatter", None)
# log_config.pop("handlers", None)
if output and not hasattr(output, "encoding"):
# This is a BinaryIO, we need to give logging.StreamHandler a TextIO
output = codecs.lookup("utf-8").streamwriter(output) # type: ignore
config["handlers"].update(
{
"default": {
"level": log_level.upper(),
"class": "logging.StreamHandler",
"formatter": "structlog",
"stream": output,
},
}
)
config["loggers"].update(
{
# Set Airflow logging to the level requested, but most everything else at "INFO"
"airflow": {"level": log_level.upper()},
# These ones are too chatty even at info
"httpx": {"level": "WARN"},
"sqlalchemy.engine": {"level": "WARN"},
}
)
config["root"] = {
"handlers": ["default"],
"level": log_level.upper(),
"propagate": True,
}
logging.config.dictConfig(config)
def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: int):
"""
Prepare the log folder and ensure its mode is as configured.
To handle log writing when tasks are impersonated, the log files need to
be writable by the user that runs the Airflow command and the user
that is impersonated. This is mainly to handle corner cases with the
SubDagOperator. When the SubDagOperator is run, all of the operators
run under the impersonated user and create appropriate log files
as the impersonated user. However, if the user manually runs tasks
of the SubDagOperator through the UI, then the log files are created
by the user that runs the Airflow command. For example, the Airflow
run command may be run by the `airflow_sudoable` user, but the Airflow
tasks may be run by the `airflow` user. If the log files are not
writable by both users, then it's possible that re-running a task
via the UI (or vice versa) results in a permission error as the task
tries to write to a log file created by the other user.
We leave it up to the user to manage their permissions by exposing configuration for both
new folders and new log files. Default is to make new log folders and files group-writeable
to handle most common impersonation use cases. The requirement in this case will be to make
sure that the same group is set as default group for both - impersonated user and main airflow
user.
"""
directory = Path(directory)
for parent in reversed(Path(directory).parents):
parent.mkdir(mode=new_folder_permissions, exist_ok=True)
directory.mkdir(mode=new_folder_permissions, exist_ok=True)
def init_log_file(
base_log_folder: str | os.PathLike[str],
local_relative_path: str | os.PathLike[str],
*,
new_folder_permissions: int = 0o775,
new_file_permissions: int = 0o664,
) -> Path:
"""
Ensure log file and parent directories are created with the correct permissions.
Any directories that are missing are created with the right permission bits.
See above ``init_log_folder`` method for more detailed explanation.
"""
full_path = Path(base_log_folder, local_relative_path)
init_log_folder(full_path.parent, new_folder_permissions)
try:
full_path.touch(new_file_permissions)
except OSError as e:
log = structlog.get_logger(__name__)
log.warning("OSError while changing ownership of the log file. %s", e)
return full_path
def reconfigure_logger(
logger: WrappedLogger, without_processor_type: type, level_override: int | None = None
):
procs = getattr(logger, "_processors", None)
if procs is None:
procs = structlog.get_config()["processors"]
procs = [proc for proc in procs if not isinstance(proc, without_processor_type)]
return structlog.wrap_logger(
getattr(logger, "_logger", None),
processors=procs,
**getattr(logger, "_context", {}),
__level_override=level_override,
)
if __name__ == "__main__":
configure_logging(
# json_output=True,
log_format="[%(blue)s%(asctime)s%(reset)s] {%(blue)s%(filename)s:%(reset)s%(lineno)d} %(log_color)s%(levelname)s%(reset)s - %(log_color)s%(message)s%(reset)s",
)
log = logging.getLogger("testing.stlib")
log2 = structlog.get_logger(logger_name="testing.structlog")
def raises():
try:
1 / 0
except ZeroDivisionError:
log.exception("str")
try:
1 / 0
except ZeroDivisionError:
log2.exception("std")
def main():
log.info("in main")
log2.info("in main", key="value")
raises()
main()