blob: 3a07b149ae72a37ec56805844fb652242c4d7df2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
import logging
from typing import TYPE_CHECKING
from airflow.configuration import conf
from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol
from airflow.metrics.validators import (
AllowListValidator,
BlockListValidator,
ListValidator,
validate_stat,
)
if TYPE_CHECKING:
from datadog import DogStatsd
log = logging.getLogger(__name__)
class SafeDogStatsdLogger:
"""DogStatsd Logger."""
def __init__(
self,
dogstatsd_client: DogStatsd,
metrics_validator: ListValidator = AllowListValidator(),
metrics_tags: bool = False,
metric_tags_validator: ListValidator = AllowListValidator(),
) -> None:
self.dogstatsd = dogstatsd_client
self.metrics_validator = metrics_validator
self.metrics_tags = metrics_tags
self.metric_tags_validator = metric_tags_validator
@validate_stat
def incr(
self,
stat: str,
count: int = 1,
rate: float = 1,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Increment stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [
f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
]
else:
tags_list = []
if self.metrics_validator.test(stat):
return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None
@validate_stat
def decr(
self,
stat: str,
count: int = 1,
rate: float = 1,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Decrement stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [
f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
]
else:
tags_list = []
if self.metrics_validator.test(stat):
return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
return None
@validate_stat
def gauge(
self,
stat: str,
value: int | float,
rate: float = 1,
delta: bool = False,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Gauge stat."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [
f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
]
else:
tags_list = []
if self.metrics_validator.test(stat):
return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
return None
@validate_stat
def timing(
self,
stat: str,
dt: DeltaType,
*,
tags: dict[str, str] | None = None,
) -> None:
"""Stats timing."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [
f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
]
else:
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds()
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None
@validate_stat
def timer(
self,
stat: str | None = None,
tags: dict[str, str] | None = None,
**kwargs,
) -> TimerProtocol:
"""Timer metric that can be cancelled."""
if self.metrics_tags and isinstance(tags, dict):
tags_list = [
f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
]
else:
tags_list = []
if stat and self.metrics_validator.test(stat):
return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs))
return Timer()
def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger:
"""Get DataDog StatsD logger."""
from datadog import DogStatsd
metrics_validator: ListValidator
dogstatsd = DogStatsd(
host=conf.get("metrics", "statsd_host"),
port=conf.getint("metrics", "statsd_port"),
namespace=conf.get("metrics", "statsd_prefix"),
constant_tags=cls.get_constant_tags(),
)
if conf.get("metrics", "metrics_allow_list", fallback=None):
metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list"))
if conf.get("metrics", "metrics_block_list", fallback=None):
log.warning(
"Ignoring metrics_block_list as both metrics_allow_list "
"and metrics_block_list have been set"
)
elif conf.get("metrics", "metrics_block_list", fallback=None):
metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list"))
else:
metrics_validator = AllowListValidator()
datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True)
metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator)