blob: 4c8f6fb1d7c764bb7b0bffc144cdf60d1174e851 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import importlib
import logging
import re
from unittest import mock
from unittest.mock import Mock
import pytest
import statsd
import airflow
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
from airflow.metrics.datadog_logger import SafeDogStatsdLogger
from airflow.metrics.statsd_logger import SafeStatsdLogger
from airflow.metrics.validators import (
AllowListValidator,
BlockListValidator,
PatternAllowListValidator,
PatternBlockListValidator,
)
from tests.test_utils.config import conf_vars
class CustomStatsd(statsd.StatsClient):
pass
class InvalidCustomStatsd:
"""
This custom Statsd class is invalid because it does not subclass
statsd.StatsClient.
"""
def __init__(self, host=None, port=None, prefix=None):
pass
class TestStats:
def setup_method(self):
self.statsd_client = Mock(spec=statsd.StatsClient)
self.stats = SafeStatsdLogger(self.statsd_client)
def test_increment_counter_with_valid_name(self):
self.stats.incr("test_stats_run")
self.statsd_client.incr.assert_called_once_with("test_stats_run", 1, 1)
def test_stat_name_must_be_a_string(self):
self.stats.incr([])
self.statsd_client.assert_not_called()
def test_stat_name_must_not_exceed_max_length(self):
self.stats.incr("X" * 300)
self.statsd_client.assert_not_called()
def test_stat_name_must_only_include_allowed_characters(self):
self.stats.incr("test/$tats")
self.statsd_client.assert_not_called()
def test_timer(self):
with self.stats.timer("empty_timer") as t:
pass
self.statsd_client.timer.assert_called_once_with("empty_timer")
assert isinstance(t.duration, float)
def test_empty_timer(self):
with self.stats.timer():
pass
self.statsd_client.timer.assert_not_called()
def test_timing(self):
self.stats.timing("empty_timer", 123)
self.statsd_client.timing.assert_called_once_with("empty_timer", 123)
def test_gauge(self):
self.stats.gauge("empty", 123)
self.statsd_client.gauge.assert_called_once_with("empty", 123, 1, False)
def test_decr(self):
self.stats.decr("empty")
self.statsd_client.decr.assert_called_once_with("empty", 1, 1)
def test_enabled_by_config(self):
"""Test that enabling this sets the right instance properties"""
with conf_vars({("metrics", "statsd_on"): "True"}):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
assert not hasattr(airflow.stats.Stats, "dogstatsd")
# Avoid side-effects
importlib.reload(airflow.stats)
def test_load_custom_statsd_client(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "statsd_custom_client_path"): f"{__name__}.CustomStatsd",
}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.statsd, CustomStatsd)
# Avoid side-effects
importlib.reload(airflow.stats)
def test_load_invalid_custom_stats_client(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "statsd_custom_client_path"): f"{__name__}.InvalidCustomStatsd",
}
):
importlib.reload(airflow.stats)
error_message = re.escape(
"Your custom StatsD client must extend the statsd."
"StatsClient in order to ensure backwards compatibility."
)
with pytest.raises(AirflowConfigException, match=error_message):
airflow.stats.Stats.incr("empty_key")
importlib.reload(airflow.stats)
def test_load_allow_list_validator(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "metrics_allow_list"): "name1,name2",
}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
# Avoid side-effects
importlib.reload(airflow.stats)
def test_load_block_list_validator(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "metrics_block_list"): "name1,name2",
}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.metrics_validator, BlockListValidator)
assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
# Avoid side-effects
importlib.reload(airflow.stats)
def test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "metrics_allow_list"): "name1,name2",
("metrics", "metrics_block_list"): "name1,name2",
}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.metrics_validator, AllowListValidator)
assert airflow.stats.Stats.metrics_validator.validate_list == ("name1", "name2")
# Avoid side-effects
importlib.reload(airflow.stats)
class TestDogStats:
def setup_method(self):
pytest.importorskip("datadog")
from datadog import DogStatsd
self.dogstatsd_client = Mock(spec=DogStatsd)
self.dogstatsd = SafeDogStatsdLogger(self.dogstatsd_client)
def test_increment_counter_with_valid_name_with_dogstatsd(self):
self.dogstatsd.incr("test_stats_run")
self.dogstatsd_client.increment.assert_called_once_with(
metric="test_stats_run", sample_rate=1, tags=[], value=1
)
def test_stat_name_must_be_a_string_with_dogstatsd(self):
self.dogstatsd.incr([])
self.dogstatsd_client.assert_not_called()
def test_stat_name_must_not_exceed_max_length_with_dogstatsd(self):
self.dogstatsd.incr("X" * 300)
self.dogstatsd_client.assert_not_called()
def test_stat_name_must_only_include_allowed_characters_with_dogstatsd(self):
self.dogstatsd.incr("test/$tats")
self.dogstatsd_client.assert_not_called()
def test_does_send_stats_using_dogstatsd_when_dogstatsd_on(self):
self.dogstatsd.incr("empty_key")
self.dogstatsd_client.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=[], value=1
)
def test_does_send_stats_using_dogstatsd_with_tags_without_enabled_metrics_tags(self):
self.dogstatsd.incr("empty_key", 1, 1, tags={"key1": "value1", "key2": "value2"})
self.dogstatsd_client.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=[], value=1
)
def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self):
# ToDo: Figure out why it identical to test_does_send_stats_using_dogstatsd_when_dogstatsd_on
self.dogstatsd.incr("empty_key")
self.dogstatsd_client.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=[], value=1
)
def test_timer(self):
with self.dogstatsd.timer("empty_timer"):
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[])
def test_empty_timer(self):
with self.dogstatsd.timer():
pass
self.dogstatsd_client.timed.assert_not_called()
def test_timing(self):
import datetime
self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[])
self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123.0, tags=[])
def test_gauge(self):
self.dogstatsd.gauge("empty", 123)
self.dogstatsd_client.gauge.assert_called_once_with(metric="empty", sample_rate=1, value=123, tags=[])
def test_decr(self):
self.dogstatsd.decr("empty")
self.dogstatsd_client.decrement.assert_called_once_with(
metric="empty", sample_rate=1, value=1, tags=[]
)
def test_enabled_by_config(self):
"""Test that enabling this sets the right instance properties"""
from datadog import DogStatsd
with conf_vars(
{("metrics", "statsd_datadog_enabled"): "True", ("metrics", "metrics_use_pattern_match"): "True"}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd)
assert not hasattr(airflow.stats.Stats, "statsd")
# Avoid side-effects
importlib.reload(airflow.stats)
def test_does_not_send_stats_using_statsd_when_statsd_and_dogstatsd_both_on(self):
from datadog import DogStatsd
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "statsd_datadog_enabled"): "True",
("metrics", "metrics_use_pattern_match"): "True",
}
):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.dogstatsd, DogStatsd)
assert not hasattr(airflow.stats.Stats, "statsd")
importlib.reload(airflow.stats)
class TestStatsAllowAndBlockLists:
@pytest.mark.parametrize(
"validator, stat_name, expect_incr",
[
(PatternAllowListValidator, "stats_one", True),
(PatternAllowListValidator, "stats_two.bla", True),
(PatternAllowListValidator, "stats_three.foo", True),
(PatternAllowListValidator, "stats_foo_three", True),
(PatternAllowListValidator, "stats_three", False),
(AllowListValidator, "stats_one", True),
(AllowListValidator, "stats_two.bla", True),
(AllowListValidator, "stats_three.foo", False),
(AllowListValidator, "stats_foo_three", False),
(AllowListValidator, "stats_three", False),
(PatternBlockListValidator, "stats_one", False),
(PatternBlockListValidator, "stats_two.bla", False),
(PatternBlockListValidator, "stats_three.foo", False),
(PatternBlockListValidator, "stats_foo_three", False),
(PatternBlockListValidator, "stats_foo", False),
(PatternBlockListValidator, "stats_three", True),
(BlockListValidator, "stats_one", False),
(BlockListValidator, "stats_two.bla", False),
(BlockListValidator, "stats_three.foo", True),
(BlockListValidator, "stats_foo_three", True),
(BlockListValidator, "stats_three", True),
],
)
def test_allow_and_block_list(self, validator, stat_name, expect_incr):
statsd_client = Mock(spec=statsd.StatsClient)
stats = SafeStatsdLogger(statsd_client, validator("stats_one, stats_two, foo"))
stats.incr(stat_name)
if expect_incr:
statsd_client.incr.assert_called_once_with(stat_name, 1, 1)
else:
statsd_client.assert_not_called()
@pytest.mark.parametrize(
"match_pattern, expect_incr",
[
("^stat", True), # Match: Regex Startswith
("a.{4}o", True), # Match: RegEx Pattern
("foo", True), # Match: Any substring
("stat", True), # Match: Substring Startswith
("^banana", False), # No match
],
)
def test_regex_matches(self, match_pattern, expect_incr):
stat_name = "stats_foo_one"
validator = PatternAllowListValidator
statsd_client = Mock(spec=statsd.StatsClient)
stats = SafeStatsdLogger(statsd_client, validator(match_pattern))
stats.incr(stat_name)
if expect_incr:
statsd_client.incr.assert_called_once_with(stat_name, 1, 1)
else:
statsd_client.assert_not_called()
class TestPatternOrBasicValidatorConfigOption:
def teardown_method(self):
# Avoid side-effects
importlib.reload(airflow.stats)
stats_on = {("metrics", "statsd_on"): "True"}
pattern_on = {("metrics", "metrics_use_pattern_match"): "True"}
pattern_off = {("metrics", "metrics_use_pattern_match"): "False"}
allow_list = {("metrics", "metrics_allow_list"): "foo,bar"}
block_list = {("metrics", "metrics_block_list"): "foo,bar"}
@pytest.mark.parametrize(
"config, expected",
[
pytest.param(
{**stats_on, **pattern_on},
PatternAllowListValidator,
id="pattern_allow_by_default",
),
pytest.param(
stats_on,
AllowListValidator,
id="basic_allow_by_default",
),
pytest.param(
{**stats_on, **pattern_on, **allow_list},
PatternAllowListValidator,
id="pattern_allow_list_provided",
),
pytest.param(
{**stats_on, **pattern_off, **allow_list},
AllowListValidator,
id="basic_allow_list_provided",
),
pytest.param(
{**stats_on, **pattern_on, **block_list},
PatternBlockListValidator,
id="pattern_block_list_provided",
),
pytest.param(
{**stats_on, **block_list},
BlockListValidator,
id="basic_block_list_provided",
),
],
)
def test_pattern_or_basic_picker(self, config, expected):
with conf_vars(config):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
assert type(airflow.stats.Stats.instance.metrics_validator) == expected
@conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): "bax,qux"})
def test_setting_allow_and_block_logs_warning(self, caplog):
importlib.reload(airflow.stats)
assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
assert type(airflow.stats.Stats.instance.metrics_validator) == AllowListValidator
with caplog.at_level(logging.WARNING):
assert "Ignoring metrics_block_list" in caplog.text
class TestDogStatsWithAllowList:
def setup_method(self):
pytest.importorskip("datadog")
from datadog import DogStatsd
self.dogstatsd_client = Mock(speck=DogStatsd)
self.dogstats = SafeDogStatsdLogger(self.dogstatsd_client, AllowListValidator("stats_one, stats_two"))
def test_increment_counter_with_allowed_key(self):
self.dogstats.incr("stats_one")
self.dogstatsd_client.increment.assert_called_once_with(
metric="stats_one", sample_rate=1, tags=[], value=1
)
def test_increment_counter_with_allowed_prefix(self):
self.dogstats.incr("stats_two.bla")
self.dogstatsd_client.increment.assert_called_once_with(
metric="stats_two.bla", sample_rate=1, tags=[], value=1
)
def test_not_increment_counter_if_not_allowed(self):
self.dogstats.incr("stats_three")
self.dogstatsd_client.assert_not_called()
class TestDogStatsWithMetricsTags:
def setup_method(self):
pytest.importorskip("datadog")
from datadog import DogStatsd
self.dogstatsd_client = Mock(speck=DogStatsd)
self.dogstatsd = SafeDogStatsdLogger(self.dogstatsd_client, metrics_tags=True)
def test_does_send_stats_using_dogstatsd_with_tags(self):
self.dogstatsd.incr("empty_key", 1, 1, tags={"key1": "value1", "key2": "value2"})
self.dogstatsd_client.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=["key1:value1", "key2:value2"], value=1
)
class TestDogStatsWithDisabledMetricsTags:
def setup_method(self):
pytest.importorskip("datadog")
from datadog import DogStatsd
self.dogstatsd_client = Mock(speck=DogStatsd)
self.dogstatsd = SafeDogStatsdLogger(
self.dogstatsd_client,
metrics_tags=True,
metric_tags_validator=BlockListValidator("key1"),
)
def test_does_send_stats_using_dogstatsd_with_tags(self):
self.dogstatsd.incr("empty_key", 1, 1, tags={"key1": "value1", "key2": "value2"})
self.dogstatsd_client.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=["key2:value2"], value=1
)
class TestStatsWithInfluxDBEnabled:
def setup_method(self):
with conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "statsd_influxdb_enabled"): "True",
}
):
self.statsd_client = Mock(spec=statsd.StatsClient)
self.stats = SafeStatsdLogger(
self.statsd_client,
influxdb_tags_enabled=True,
metric_tags_validator=BlockListValidator("key2,key3"),
)
def test_increment_counter(self):
self.stats.incr(
"test_stats_run.delay",
)
self.statsd_client.incr.assert_called_once_with("test_stats_run.delay", 1, 1)
def test_increment_counter_with_tags(self):
self.stats.incr(
"test_stats_run.delay",
tags={"key0": "val0", "key1": "val1", "key2": "val2"},
)
self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key0=val0,key1=val1", 1, 1)
def test_does_not_increment_counter_drops_invalid_tags(self):
self.stats.incr(
"test_stats_run.delay",
tags={"key0,": "val0", "key1": "val1", "key2": "val2", "key3": "val3"},
)
self.statsd_client.incr.assert_called_once_with("test_stats_run.delay,key1=val1", 1, 1)
def always_invalid(stat_name):
raise InvalidStatsNameException(f"Invalid name: {stat_name}")
def always_valid(stat_name):
return stat_name
class TestCustomStatsName:
@conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "stat_name_handler"): "tests.core.test_stats.always_invalid",
}
)
@mock.patch("statsd.StatsClient")
def test_does_not_send_stats_using_statsd_when_the_name_is_not_valid(self, mock_statsd):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("empty_key")
mock_statsd.return_value.assert_not_called()
@conf_vars(
{
("metrics", "statsd_datadog_enabled"): "True",
("metrics", "metrics_use_pattern_match"): "True",
("metrics", "stat_name_handler"): "tests.core.test_stats.always_invalid",
}
)
@mock.patch("datadog.DogStatsd")
def test_does_not_send_stats_using_dogstatsd_when_the_name_is_not_valid(self, mock_dogstatsd):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("empty_key")
mock_dogstatsd.return_value.assert_not_called()
@conf_vars(
{
("metrics", "statsd_on"): "True",
("metrics", "stat_name_handler"): "tests.core.test_stats.always_valid",
}
)
@mock.patch("statsd.StatsClient")
def test_does_send_stats_using_statsd_when_the_name_is_valid(self, mock_statsd):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("empty_key")
mock_statsd.return_value.incr.assert_called_once_with("empty_key", 1, 1)
@conf_vars(
{
("metrics", "statsd_datadog_enabled"): "True",
("metrics", "metrics_use_pattern_match"): "True",
("metrics", "stat_name_handler"): "tests.core.test_stats.always_valid",
}
)
@mock.patch("datadog.DogStatsd")
def test_does_send_stats_using_dogstatsd_when_the_name_is_valid(self, mock_dogstatsd):
importlib.reload(airflow.stats)
airflow.stats.Stats.incr("empty_key")
mock_dogstatsd.return_value.increment.assert_called_once_with(
metric="empty_key", sample_rate=1, tags=[], value=1
)
def teardown_method(self) -> None:
# To avoid side-effect
importlib.reload(airflow.stats)