| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import annotations |
| |
| import datetime |
| import logging |
| from typing import TYPE_CHECKING |
| |
| from airflow.configuration import conf |
| from airflow.metrics.protocols import DeltaType, Timer, TimerProtocol |
| from airflow.metrics.validators import ( |
| AllowListValidator, |
| BlockListValidator, |
| ListValidator, |
| validate_stat, |
| ) |
| |
| if TYPE_CHECKING: |
| from datadog import DogStatsd |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class SafeDogStatsdLogger: |
| """DogStatsd Logger.""" |
| |
| def __init__( |
| self, |
| dogstatsd_client: DogStatsd, |
| metrics_validator: ListValidator = AllowListValidator(), |
| metrics_tags: bool = False, |
| metric_tags_validator: ListValidator = AllowListValidator(), |
| ) -> None: |
| self.dogstatsd = dogstatsd_client |
| self.metrics_validator = metrics_validator |
| self.metrics_tags = metrics_tags |
| self.metric_tags_validator = metric_tags_validator |
| |
| @validate_stat |
| def incr( |
| self, |
| stat: str, |
| count: int = 1, |
| rate: float = 1, |
| *, |
| tags: dict[str, str] | None = None, |
| ) -> None: |
| """Increment stat.""" |
| if self.metrics_tags and isinstance(tags, dict): |
| tags_list = [ |
| f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) |
| ] |
| else: |
| tags_list = [] |
| if self.metrics_validator.test(stat): |
| return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate) |
| return None |
| |
| @validate_stat |
| def decr( |
| self, |
| stat: str, |
| count: int = 1, |
| rate: float = 1, |
| *, |
| tags: dict[str, str] | None = None, |
| ) -> None: |
| """Decrement stat.""" |
| if self.metrics_tags and isinstance(tags, dict): |
| tags_list = [ |
| f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) |
| ] |
| else: |
| tags_list = [] |
| if self.metrics_validator.test(stat): |
| return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate) |
| return None |
| |
| @validate_stat |
| def gauge( |
| self, |
| stat: str, |
| value: int | float, |
| rate: float = 1, |
| delta: bool = False, |
| *, |
| tags: dict[str, str] | None = None, |
| ) -> None: |
| """Gauge stat.""" |
| if self.metrics_tags and isinstance(tags, dict): |
| tags_list = [ |
| f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) |
| ] |
| else: |
| tags_list = [] |
| if self.metrics_validator.test(stat): |
| return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate) |
| return None |
| |
| @validate_stat |
| def timing( |
| self, |
| stat: str, |
| dt: DeltaType, |
| *, |
| tags: dict[str, str] | None = None, |
| ) -> None: |
| """Stats timing.""" |
| if self.metrics_tags and isinstance(tags, dict): |
| tags_list = [ |
| f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) |
| ] |
| else: |
| tags_list = [] |
| if self.metrics_validator.test(stat): |
| if isinstance(dt, datetime.timedelta): |
| dt = dt.total_seconds() |
| return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list) |
| return None |
| |
| @validate_stat |
| def timer( |
| self, |
| stat: str | None = None, |
| tags: dict[str, str] | None = None, |
| **kwargs, |
| ) -> TimerProtocol: |
| """Timer metric that can be cancelled.""" |
| if self.metrics_tags and isinstance(tags, dict): |
| tags_list = [ |
| f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key) |
| ] |
| else: |
| tags_list = [] |
| if stat and self.metrics_validator.test(stat): |
| return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs)) |
| return Timer() |
| |
| |
| def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger: |
| """Get DataDog StatsD logger.""" |
| from datadog import DogStatsd |
| |
| metrics_validator: ListValidator |
| |
| dogstatsd = DogStatsd( |
| host=conf.get("metrics", "statsd_host"), |
| port=conf.getint("metrics", "statsd_port"), |
| namespace=conf.get("metrics", "statsd_prefix"), |
| constant_tags=cls.get_constant_tags(), |
| ) |
| if conf.get("metrics", "metrics_allow_list", fallback=None): |
| metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list")) |
| if conf.get("metrics", "metrics_block_list", fallback=None): |
| log.warning( |
| "Ignoring metrics_block_list as both metrics_allow_list " |
| "and metrics_block_list have been set" |
| ) |
| elif conf.get("metrics", "metrics_block_list", fallback=None): |
| metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list")) |
| else: |
| metrics_validator = AllowListValidator() |
| datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True) |
| metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None)) |
| return SafeDogStatsdLogger(dogstatsd, metrics_validator, datadog_metrics_tags, metric_tags_validator) |